Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 10 additions & 30 deletions src/dvsim/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,24 @@

"""DVSim Scheduler Instrumentation."""

from dvsim.instrumentation.base import (
InstrumentationAggregator,
InstrumentationFragment,
InstrumentationFragments,
JobFragment,
SchedulerFragment,
SchedulerInstrumentation,
)
from dvsim.instrumentation.base import InstrumentationAggregator, SchedulerInstrumentation
from dvsim.instrumentation.factory import InstrumentationFactory
from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment
from dvsim.instrumentation.resources import (
ResourceInstrumentation,
ResourceJobFragment,
ResourceSchedulerFragment,
from dvsim.instrumentation.records import (
InstrumentationMetrics,
InstrumentationResults,
JobMetrics,
SchedulerMetrics,
)
from dvsim.instrumentation.runtime import flush, get, set_instrumentation, set_report_path
from dvsim.instrumentation.timing import (
TimingInstrumentation,
TimingJobFragment,
TimingSchedulerFragment,
)

__all__ = (
"InstrumentationAggregator",
"InstrumentationFactory",
"InstrumentationFragment",
"InstrumentationFragments",
"JobFragment",
"MetadataInstrumentation",
"MetadataJobFragment",
"ResourceInstrumentation",
"ResourceJobFragment",
"ResourceSchedulerFragment",
"SchedulerFragment",
"InstrumentationMetrics",
"InstrumentationResults",
"JobMetrics",
"SchedulerInstrumentation",
"TimingInstrumentation",
"TimingJobFragment",
"TimingSchedulerFragment",
"SchedulerMetrics",
"flush",
"get",
"set_instrumentation",
Expand Down
109 changes: 37 additions & 72 deletions src/dvsim/instrumentation/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,35 @@

"""DVSim scheduler instrumentation base classes."""

from collections.abc import Iterable, Sequence
from dataclasses import asdict, dataclass
from typing import Any, TypeAlias

from collections import defaultdict
from collections.abc import Iterable, Mapping

from dvsim.instrumentation.records import (
InstrumentationResults,
JobInstrumentationResults,
JobMetrics,
SchedulerInstrumentationResults,
SchedulerMetrics,
)
from dvsim.job.data import JobSpec
from dvsim.job.status import JobStatus
from dvsim.logging import log
from dvsim.scheduler.core import Scheduler

__all__ = (
"InstrumentationAggregator",
"InstrumentationFragment",
"InstrumentationFragments",
"JobFragment",
"SchedulerFragment",
"SchedulerInstrumentation",
)


@dataclass
class InstrumentationFragment:
"""Base class for instrumentation reports / report fragments."""

def to_dict(self) -> dict[str, Any]:
"""Convert the report fragment to a dictionary."""
return asdict(self)


@dataclass
class SchedulerFragment(InstrumentationFragment):
"""Base class for instrumentation report fragments related to the scheduler."""


@dataclass
class JobFragment(InstrumentationFragment):
"""Base class for instrumentation report fragments related to individual jobs."""

job: JobSpec


# Each instrumentation object can report any number of information fragments about the
# scheduler and about its jobs.
InstrumentationFragments: TypeAlias = tuple[Sequence[SchedulerFragment], Sequence[JobFragment]]


class SchedulerInstrumentation:
"""Instrumentation for the scheduler.

Base class for scheduler instrumentation, recording a variety of performance and
behavioural metrics for analysis.
"""

@property
def name(self) -> str:
"""The name to use to refer to this instrumentation mechanism."""
return self.__class__.__name__
name: str = ""

def start(self) -> None:
"""Begin instrumentation, starting whatever is needed before the scheduler is run."""
Expand Down Expand Up @@ -89,10 +62,14 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: # noqa
"""Notify instrumentation of a change in status for some scheduled job."""
return

def build_report_fragments(self) -> InstrumentationFragments | None:
"""Build report fragments from the collected instrumentation information."""
def get_scheduler_data(self) -> SchedulerMetrics | None:
"""Retrieve scheduler metrics measured by this instrumentation."""
return None

def get_job_data(self) -> Mapping[str, JobMetrics]:
"""Retrieve per-job metrics measured by this instrumentation."""
return {}


class InstrumentationAggregator:
"""Aggregator for scheduler instrumentation collection, composing multiple instrumentations."""
Expand All @@ -118,40 +95,28 @@ def stop(self) -> None:
for inst in self._instrumentations:
inst.stop()

def collect(self) -> dict[str, Any]:
def collect(self) -> InstrumentationResults:
"""Collect all gathered instrumentation data from the wrapped objects."""
log.info("Collecting instrumentation data...")

scheduler_fragments = []
job_fragments = []

for inst in self._instrumentations:
fragments = inst.build_report_fragments()
if fragments is None:
continue
scheduler_fragments += fragments[0]
job_fragments += fragments[1]
log.info("Collecting instrumentation report data...")

log.info("Finished collecting instrumentation data. Merging instrumentation data...")
scheduler_metrics: dict[str, SchedulerMetrics] = {}
job_metrics: dict[str, dict[str, JobMetrics]] = defaultdict(dict)

scheduler: dict[str, Any] = {}
for i, scheduler_frag in enumerate(scheduler_fragments, start=1):
for i, inst in enumerate(self._instrumentations, start=1):
log.debug(
"Merging instrumentation report scheduler data (%d/%d)", i, len(scheduler_fragments)
"Collecting instrumentation report data (%d/%d)", i, len(self._instrumentations)
)
scheduler.update(scheduler_frag.to_dict())

jobs: dict[tuple[str, str], dict[str, Any]] = {}
for i, job_frag in enumerate(job_fragments, start=1):
log.debug("Merging instrumentation report job data (%d/%d)", i, len(job_fragments))
spec = job_frag.job
# We can uniquely identify jobs from the combination of their full name & target
job_id = (spec.full_name, spec.target)
job = jobs.get(job_id)
if job is None:
job = {}
jobs[job_id] = job
job.update({k: v for k, v in job_frag.to_dict().items() if k != "job"})

log.info("Finished merging instrumentation report data.")
return {"scheduler": scheduler, "jobs": list(jobs.values())}
scheduler_record = inst.get_scheduler_data()
if scheduler_record is not None:
scheduler_metrics[inst.name] = scheduler_record
for job_id, job_record in inst.get_job_data().items():
job_metrics[job_id][inst.name] = job_record

log.info("Finished collecting instrumentation report data.")
return InstrumentationResults(
scheduler=SchedulerInstrumentationResults(**scheduler_metrics), # type: ignore[reportArgumentType]
jobs={
job_id: JobInstrumentationResults(**job_data) # type: ignore[reportArgumentType]
for job_id, job_data in job_metrics.items()
},
)
57 changes: 18 additions & 39 deletions src/dvsim/instrumentation/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,14 @@

"""DVSim scheduler instrumentation metadata (to be included in the generated report)."""

from dataclasses import dataclass
from collections.abc import Mapping

from dvsim.instrumentation.base import (
InstrumentationFragments,
JobFragment,
SchedulerInstrumentation,
)
from dvsim.instrumentation.base import SchedulerInstrumentation
from dvsim.instrumentation.records import JobInstrumentationMetadata
from dvsim.job.data import JobSpec
from dvsim.job.status import JobStatus

__all__ = (
"MetadataInstrumentation",
"MetadataJobFragment",
)


@dataclass
class MetadataJobFragment(JobFragment):
"""Instrumentation metadata for scheduled jobs, reporting the final status of the job."""

name: str
full_name: str
job_type: str
target: str
tool: str
backend: str | None
dependencies: list[str]
status: str
__all__ = ("MetadataInstrumentation",)


class MetadataInstrumentation(SchedulerInstrumentation):
Expand All @@ -41,6 +21,8 @@ class MetadataInstrumentation(SchedulerInstrumentation):
part of the instrumentation report for analysis, regardless of other instrumentations.
"""

name = "meta"

def __init__(self) -> None:
"""Construct a `MetadataInstrumentation`."""
super().__init__()
Expand All @@ -51,20 +33,17 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None:
status_str = status.name.capitalize()
self._jobs[job.id] = (job, status_str)

def build_report_fragments(self) -> InstrumentationFragments | None:
"""Build report fragments from the collected instrumentation information."""
jobs = [
MetadataJobFragment(
spec,
spec.name,
spec.full_name,
spec.job_type,
spec.target,
spec.tool.name,
spec.backend,
spec.dependencies,
status_str,
def get_job_data(self) -> Mapping[str, JobInstrumentationMetadata]:
"""Retrieve per-job metrics measured by this instrumentation."""
return {
spec.id: JobInstrumentationMetadata(
name=spec.name,
job_type=spec.job_type,
target=spec.target,
tool=spec.tool.name,
backend=spec.backend,
dependencies=list(spec.dependencies),
status=status_str,
)
for spec, status_str in self._jobs.values()
]
return ([], jobs)
}
Loading
Loading