From b46bee0e8e7dde3cfb0e2aea162b15b2a3dcbb56 Mon Sep 17 00:00:00 2001 From: Tim-Oliver Husser Date: Fri, 29 May 2026 14:25:15 +0200 Subject: [PATCH 1/6] started working on default script --- pyobs/robotic/scripts/default.py | 180 +++++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 pyobs/robotic/scripts/default.py diff --git a/pyobs/robotic/scripts/default.py b/pyobs/robotic/scripts/default.py new file mode 100644 index 00000000..f5f4059d --- /dev/null +++ b/pyobs/robotic/scripts/default.py @@ -0,0 +1,180 @@ +import asyncio +import logging +from pydantic import model_validator, PrivateAttr +from typing import Any, cast, Literal, Self + +from pyobs.interfaces import ( + IRoof, + IAutoGuiding, + ITelescope, + IAcquisition, + ICamera, + IFilters, + IPointingRaDec, +) +from pyobs.robotic.scripts import Script +from pyobs.robotic.task import TaskData +from pyobs.utils.enums import ImageType +import pyobs.utils.exceptions as exc +from pyobs.utils.logger import DuplicateFilter +from pyobs.utils.parallel import Future + +log = logging.getLogger(__name__) + +# logger for logging name of task +cannot_run_logger = logging.getLogger(__name__ + ":cannot_run") +cannot_run_logger.addFilter(DuplicateFilter()) + + +class DefaultScript(Script): + """Default script for imaging configs.""" + + image_type: Literal["BIAS", "DARK", "OBJECT"] = "OBJECT" + + camera: str + roof: str | None = None + telescope: str | None = None + filters: str | None = None + autoguider: str | None = None + acquisition: str | None = None + + _image_type: ImageType = PrivateAttr() + _roof: IRoof | None = PrivateAttr() + _telescope: ITelescope | None = PrivateAttr() + _camera: ICamera | None = PrivateAttr() + _filters: IFilters | None = PrivateAttr() + _autoguider: IAutoGuiding | None = PrivateAttr() + _acquisition: IAcquisition | None = PrivateAttr() + + @model_validator(mode="after") + def set_image_type(self) -> Self: + if self.image_type == "BIAS": + self._image_type = ImageType.BIAS + elif self.image_type == "DARK": + self._image_type = ImageType.DARK + else: + self._image_type = ImageType.OBJECT + return self + + @model_validator(mode="after") + async def _get_proxies(self) -> Self: + self._roof = await self.comm.safe_proxy(self.roof, IRoof) + self._telescope = await self.comm.safe_proxy(self.telescope, ITelescope) + self._camera = await self.comm.safe_proxy(self.camera, ICamera) + self._filters = await self.comm.safe_proxy(self.filters, IFilters) + self._autoguider = await self.comm.safe_proxy(self.autoguider, IAutoGuiding) + self._acquisition = await self.comm.safe_proxy(self.acquisition, IAcquisition) + + async def can_run(self, data: TaskData | None) -> bool: + """Whether this config can currently run. + + Returns: + True, if the script can run now + """ + + # need camera + if self._camera is None: + cannot_run_logger.info("Cannot run task, no camera found.") + return False + + # for OBJECT exposure we need more + if self._image_type == ImageType.OBJECT: + # we need an open roof and a working telescope + if self._roof is None or not await self._roof.is_ready(): + cannot_run_logger.warning("Cannot run task, no roof found or roof not ready.") + return False + if self._telescope is None or not await self._telescope.is_ready(): + cannot_run_logger.warning("Cannot run task, no telescope found or telescope not ready.") + return False + + # we probably need filters and autoguider/acquisition + if self._filters is None: + cannot_run_logger.warning("Cannot run task, No filter module found.") + return False + + """ + # acquisition? + cfg = self.request.configurations[0] + if cfg.acquisition_config is not None and cfg.acquisition_config.mode == "ON" and acquisition is None: + cannot_run_logger.warning("Cannot run task, no acquisition found.") + return False + + # guiding? + if cfg.guiding_config is not None and cfg.guiding_config.mode == "ON" and autoguider is None: + cannot_run_logger.warning("Cannot run task, no auto guider found.") + return False + """ + + # seems alright + return True + + async def run(self, data: TaskData | None) -> None: + """Run script. + + Raises: + InterruptedError: If interrupted + """ + + # got a target? + track: Future | asyncio.Task[Any] = Future(empty=True) + if self._image_type == ImageType.OBJECT: + if self._telescope is None: + raise ValueError("No telescope given.") + log.info("Moving to target %s...", data.task.target.name) + if isinstance(self._telescope, IPointingRaDec): + track = asyncio.create_task(self._telescope.move_radec(data.task.target.ra, data.task.target.dec)) + else: + raise exc.MotionError("Telescope can't move to RA/Dec.") + + """ + # acquisition? + if cfg.acquisition_config is not None and cfg.acquisition_config.mode == "ON": + # wait for track + await track + + # do acquisition + if acquisition is None: + raise ValueError("No acquisition given.") + log.info("Performing acquisition...") + await acquisition.acquire_target() + + # guiding? + if cfg.guiding_config is not None and cfg.guiding_config.mode == "ON": + if autoguider is None: + raise ValueError("No autoguider given.") + log.info("Starting auto-guiding...") + await autoguider.start() + """ + + # total (exposure) time done in this config + self.exptime_done = 0.0 + + # finally, stop telescope + if self._image_type == ImageType.OBJECT: + log.info("Stopping telescope...") + await cast(ITelescope, self._telescope).stop_motion() + + def get_fits_headers(self, namespaces: list[str] | None = None) -> dict[str, Any]: + """Returns FITS header for the current status of this module. + + Args: + namespaces: If given, only return FITS headers for the given namespaces. + + Returns: + Dictionary containing FITS headers. + """ + + # init header + hdr = {} + + # which image type? + if self._image_type == ImageType.OBJECT: + # add object name + # hdr["OBJECT"] = self.request.configurations[0].target.name, "Name of target" + pass + + # return + return hdr + + +__all__ = ["DefaultScript"] From d1e765c36540fdb16835c8a22eb6bc8ce4bf923c Mon Sep 17 00:00:00 2001 From: Tim-Oliver Husser Date: Fri, 29 May 2026 15:57:42 +0200 Subject: [PATCH 2/6] finished default imaging script --- pyobs/robotic/scripts/__init__.py | 1 + pyobs/robotic/scripts/default.py | 180 -------------------- pyobs/robotic/scripts/imaging.py | 267 ++++++++++++++++++++++++++++++ 3 files changed, 268 insertions(+), 180 deletions(-) delete mode 100644 pyobs/robotic/scripts/default.py create mode 100644 pyobs/robotic/scripts/imaging.py diff --git a/pyobs/robotic/scripts/__init__.py b/pyobs/robotic/scripts/__init__.py index 429ed677..a577ca6e 100644 --- a/pyobs/robotic/scripts/__init__.py +++ b/pyobs/robotic/scripts/__init__.py @@ -10,3 +10,4 @@ from .selector import SelectorScript from .sequential import SequentialRunner from .skyflats import SkyFlats +from .imaging import ImagingScript diff --git a/pyobs/robotic/scripts/default.py b/pyobs/robotic/scripts/default.py deleted file mode 100644 index f5f4059d..00000000 --- a/pyobs/robotic/scripts/default.py +++ /dev/null @@ -1,180 +0,0 @@ -import asyncio -import logging -from pydantic import model_validator, PrivateAttr -from typing import Any, cast, Literal, Self - -from pyobs.interfaces import ( - IRoof, - IAutoGuiding, - ITelescope, - IAcquisition, - ICamera, - IFilters, - IPointingRaDec, -) -from pyobs.robotic.scripts import Script -from pyobs.robotic.task import TaskData -from pyobs.utils.enums import ImageType -import pyobs.utils.exceptions as exc -from pyobs.utils.logger import DuplicateFilter -from pyobs.utils.parallel import Future - -log = logging.getLogger(__name__) - -# logger for logging name of task -cannot_run_logger = logging.getLogger(__name__ + ":cannot_run") -cannot_run_logger.addFilter(DuplicateFilter()) - - -class DefaultScript(Script): - """Default script for imaging configs.""" - - image_type: Literal["BIAS", "DARK", "OBJECT"] = "OBJECT" - - camera: str - roof: str | None = None - telescope: str | None = None - filters: str | None = None - autoguider: str | None = None - acquisition: str | None = None - - _image_type: ImageType = PrivateAttr() - _roof: IRoof | None = PrivateAttr() - _telescope: ITelescope | None = PrivateAttr() - _camera: ICamera | None = PrivateAttr() - _filters: IFilters | None = PrivateAttr() - _autoguider: IAutoGuiding | None = PrivateAttr() - _acquisition: IAcquisition | None = PrivateAttr() - - @model_validator(mode="after") - def set_image_type(self) -> Self: - if self.image_type == "BIAS": - self._image_type = ImageType.BIAS - elif self.image_type == "DARK": - self._image_type = ImageType.DARK - else: - self._image_type = ImageType.OBJECT - return self - - @model_validator(mode="after") - async def _get_proxies(self) -> Self: - self._roof = await self.comm.safe_proxy(self.roof, IRoof) - self._telescope = await self.comm.safe_proxy(self.telescope, ITelescope) - self._camera = await self.comm.safe_proxy(self.camera, ICamera) - self._filters = await self.comm.safe_proxy(self.filters, IFilters) - self._autoguider = await self.comm.safe_proxy(self.autoguider, IAutoGuiding) - self._acquisition = await self.comm.safe_proxy(self.acquisition, IAcquisition) - - async def can_run(self, data: TaskData | None) -> bool: - """Whether this config can currently run. - - Returns: - True, if the script can run now - """ - - # need camera - if self._camera is None: - cannot_run_logger.info("Cannot run task, no camera found.") - return False - - # for OBJECT exposure we need more - if self._image_type == ImageType.OBJECT: - # we need an open roof and a working telescope - if self._roof is None or not await self._roof.is_ready(): - cannot_run_logger.warning("Cannot run task, no roof found or roof not ready.") - return False - if self._telescope is None or not await self._telescope.is_ready(): - cannot_run_logger.warning("Cannot run task, no telescope found or telescope not ready.") - return False - - # we probably need filters and autoguider/acquisition - if self._filters is None: - cannot_run_logger.warning("Cannot run task, No filter module found.") - return False - - """ - # acquisition? - cfg = self.request.configurations[0] - if cfg.acquisition_config is not None and cfg.acquisition_config.mode == "ON" and acquisition is None: - cannot_run_logger.warning("Cannot run task, no acquisition found.") - return False - - # guiding? - if cfg.guiding_config is not None and cfg.guiding_config.mode == "ON" and autoguider is None: - cannot_run_logger.warning("Cannot run task, no auto guider found.") - return False - """ - - # seems alright - return True - - async def run(self, data: TaskData | None) -> None: - """Run script. - - Raises: - InterruptedError: If interrupted - """ - - # got a target? - track: Future | asyncio.Task[Any] = Future(empty=True) - if self._image_type == ImageType.OBJECT: - if self._telescope is None: - raise ValueError("No telescope given.") - log.info("Moving to target %s...", data.task.target.name) - if isinstance(self._telescope, IPointingRaDec): - track = asyncio.create_task(self._telescope.move_radec(data.task.target.ra, data.task.target.dec)) - else: - raise exc.MotionError("Telescope can't move to RA/Dec.") - - """ - # acquisition? - if cfg.acquisition_config is not None and cfg.acquisition_config.mode == "ON": - # wait for track - await track - - # do acquisition - if acquisition is None: - raise ValueError("No acquisition given.") - log.info("Performing acquisition...") - await acquisition.acquire_target() - - # guiding? - if cfg.guiding_config is not None and cfg.guiding_config.mode == "ON": - if autoguider is None: - raise ValueError("No autoguider given.") - log.info("Starting auto-guiding...") - await autoguider.start() - """ - - # total (exposure) time done in this config - self.exptime_done = 0.0 - - # finally, stop telescope - if self._image_type == ImageType.OBJECT: - log.info("Stopping telescope...") - await cast(ITelescope, self._telescope).stop_motion() - - def get_fits_headers(self, namespaces: list[str] | None = None) -> dict[str, Any]: - """Returns FITS header for the current status of this module. - - Args: - namespaces: If given, only return FITS headers for the given namespaces. - - Returns: - Dictionary containing FITS headers. - """ - - # init header - hdr = {} - - # which image type? - if self._image_type == ImageType.OBJECT: - # add object name - # hdr["OBJECT"] = self.request.configurations[0].target.name, "Name of target" - pass - - # return - return hdr - - -__all__ = ["DefaultScript"] diff --git a/pyobs/robotic/scripts/imaging.py b/pyobs/robotic/scripts/imaging.py new file mode 100644 index 00000000..d4e244a4 --- /dev/null +++ b/pyobs/robotic/scripts/imaging.py @@ -0,0 +1,267 @@ +import asyncio +import logging +from pydantic import model_validator, PrivateAttr, BaseModel, Field +from typing import Any, cast, Self + +from pyobs.interfaces import ( + IAutoGuiding, + ITelescope, + IAcquisition, + ICamera, + IFilters, + IPointingRaDec, + IBinning, + IWindow, + IExposureTime, + IImageType, +) +from pyobs.robotic.scheduler.targets import SiderealTarget +from pyobs.robotic.scripts import Script +from pyobs.robotic.task import TaskData +from pyobs.utils.enums import ImageType, MotionStatus +import pyobs.utils.exceptions as exc +from pyobs.utils.logger import DuplicateFilter +from pyobs.utils.parallel import Future + +log = logging.getLogger(__name__) + +# logger for logging name of task +cannot_run_logger = logging.getLogger(__name__ + ":cannot_run") +cannot_run_logger.addFilter(DuplicateFilter()) + + +class AcquisitionConfig(BaseModel): + enabled: bool = True + optional: bool = False + + +class GuidingConfig(BaseModel): + enabled: bool = True + optional: bool = False + + +class InstrumentConfig(BaseModel): + exposure_time: float + count: int = 1 + image_type: ImageType = ImageType.OBJECT + binning: tuple[int, int] = (1, 1) + window: tuple[int, int, int, int] | None = None + optical_filter: str | None + + +class Configuration(BaseModel): + acquisition_config: AcquisitionConfig + guiding_config: GuidingConfig + instrument_configs: list[InstrumentConfig] = Field(default_factory=list) + repeats: int = 1 + + +class ImagingScript(Script): + """Default script for imaging configs.""" + + configuration = Configuration + + camera: str + telescope: str | None = None + filters: str | None = None + autoguider: str | None = None + acquisition: str | None = None + + _telescope: ITelescope | None = PrivateAttr() + _camera: ICamera | None = PrivateAttr() + _filters: IFilters | None = PrivateAttr() + _autoguider: IAutoGuiding | None = PrivateAttr() + _acquisition: IAcquisition | None = PrivateAttr() + + _object_name: str | None = PrivateAttr(default=None) + + @model_validator(mode="after") + async def _get_proxies(self) -> Self: + self._telescope = await self.comm.safe_proxy(self.telescope, ITelescope) + self._camera = await self.comm.safe_proxy(self.camera, ICamera) + self._filters = await self.comm.safe_proxy(self.filters, IFilters) + self._autoguider = await self.comm.safe_proxy(self.autoguider, IAutoGuiding) + self._acquisition = await self.comm.safe_proxy(self.acquisition, IAcquisition) + return self + + def _image_types(self) -> list[ImageType]: + return list(set([instr.image_type for instr in self.configuration.instrument_configs])) + + def _optical_filters(self) -> list[str]: + return list( + set( + [ + instr.optical_filter + for instr in self.configuration.instrument_configs + if instr.optical_filter is not None + ] + ) + ) + + async def can_run(self, data: TaskData | None) -> bool: + """Whether this config can currently run. + + Returns: + True, if the script can run now + """ + + # need camera + if self._camera is None: + cannot_run_logger.info("Cannot run task, no camera found.") + return False + + # for OBJECT exposure we need more + if ImageType.OBJECT in self._image_types(): + # we need a working telescope + if self._telescope is None or not await self._telescope.is_ready(): + cannot_run_logger.warning("Cannot run task, no telescope found or telescope not ready.") + return False + + # we probably need filters and autoguider/acquisition + if len(self._optical_filters()) > 0 or self._filters is None: + cannot_run_logger.warning("Cannot run task, No filter module found.") + return False + + # acquisition? + if self.configuration.acquisition_config.enabled and self._acquisition is None: + cannot_run_logger.warning("Cannot run task, no acquisition found.") + return False + + # guiding? + if self.configuration.guiding_config.enabled and self._autoguider is None: + cannot_run_logger.warning("Cannot run task, no auto guider found.") + return False + + # seems alright + return True + + async def run(self, data: TaskData | None) -> None: + """Run script. + + Raises: + InterruptedError: If interrupted + """ + + # got a target? + target = data.task.target if data is not None and data.task is not None else None + track: Future | asyncio.Task[Any] = Future(empty=True) + if ImageType.OBJECT in self._image_types() and target is not None: + if self._telescope is None: + raise ValueError("No telescope given.") + log.info("Moving to target %s...", target.name) + if isinstance(self._telescope, IPointingRaDec): + if isinstance(target, SiderealTarget): + track = asyncio.create_task(self._telescope.move_radec(target.ra, target.dec)) + raise exc.MotionError("Only sidereal targets allowed.") + else: + raise exc.MotionError("Telescope can't move to RA/Dec.") + + # acquisition? + if self.configuration.acquisition_config.enabled: + # wait for track + await track + + # do acquisition + if self._acquisition is None: + raise ValueError("No acquisition given.") + log.info("Performing acquisition...") + try: + await self._acquisition.acquire_target() + except: + if self.configuration.acquisition_config.optional: + log.warning("Could not acquire target, will continue without.") + else: + raise + + # guiding? + if self.configuration.guiding_config.enabled: + if self._autoguider is None: + raise ValueError("No autoguider given.") + log.info("Starting auto-guiding...") + await self._autoguider.start() + + # total (exposure) time done in this config + self.exptime_done = 0.0 + + # repeat configuration + for repeat in range(self.configuration.repeats): + log.info(f"Starting configuration repeat {repeat+1}/{self.configuration.repeats}...") + + # loop instrument configs + for instrument_config in self.configuration.instrument_configs: + if isinstance(self._camera, IBinning): + log.info(f"Setting binning to {instrument_config.binning[0]}x{instrument_config.binning[1]}...") + await self._camera.set_binning(*instrument_config.binning) + + if isinstance(self._camera, IWindow): + wnd = instrument_config.window + if wnd is None: + wnd = await self._camera.get_full_frame() + log.info(f"Setting window to {wnd[2]}x{wnd[3]} at {wnd[0]},{wnd[1]}...") + await self._camera.set_window(*wnd) + + if isinstance(self._camera, IExposureTime): + log.info(f"Setting exposure time to {instrument_config.exposure_time}s...") + await self._camera.set_exposure_time(instrument_config.exposure_time) + + # set image type + if isinstance(self._camera, IImageType): + log.info(f"Setting image type to {instrument_config.image_type}...") + await self._camera.set_image_type(instrument_config.image_type) + + set_filter: Future | asyncio.Task[Any] = Future(empty=True) + if instrument_config.optical_filter is not None and self._filters is not None: + log.info(f"Setting filter to {instrument_config.optical_filter}...") + set_filter = asyncio.create_task(self._filters.set_filter(instrument_config.optical_filter)) + + # wait for tracking and filter + await Future.wait_all([track, set_filter]) + + # set object name? + if instrument_config.image_type == ImageType.OBJECT and target is not None: + self._object_name = target.name + + # do repeats + for repeat2 in range(instrument_config.count): + log.info(f"Exposing image {repeat2+1}/{instrument_config.count}...") + + # grab image + await cast(ICamera, self._camera).grab_data() + self.exptime_done += instrument_config.exposure_time + + # reset object name + self._object_name = None + + # stop auto guiding + if self._autoguider is not None and self.configuration.guiding_config.enabled: + log.info("Stopping auto-guiding...") + await self._autoguider.stop() + + # finally, stop telescope + if await cast(ITelescope, self._telescope).get_motion_status() != MotionStatus.IDLE: + log.info("Stopping telescope...") + await cast(ITelescope, self._telescope).stop_motion() + + def get_fits_headers(self, namespaces: list[str] | None = None) -> dict[str, Any]: + """Returns FITS header for the current status of this module. + + Args: + namespaces: If given, only return FITS headers for the given namespaces. + + Returns: + Dictionary containing FITS headers. + """ + + # init header + hdr = {} + + # which image type? + if self._object_name is not None: + # add object name + hdr["OBJECT"] = self._object_name, "Name of target" + + # return + return hdr + + +__all__ = ["ImagingScript"] From b817d032cb243c1abda1263629e97d65080f0d80 Mon Sep 17 00:00:00 2001 From: Tim-Oliver Husser Date: Fri, 29 May 2026 21:16:47 +0200 Subject: [PATCH 3/6] fixed some bugs --- pyobs/robotic/scripts/imaging.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/pyobs/robotic/scripts/imaging.py b/pyobs/robotic/scripts/imaging.py index d4e244a4..4477af1d 100644 --- a/pyobs/robotic/scripts/imaging.py +++ b/pyobs/robotic/scripts/imaging.py @@ -46,7 +46,7 @@ class InstrumentConfig(BaseModel): image_type: ImageType = ImageType.OBJECT binning: tuple[int, int] = (1, 1) window: tuple[int, int, int, int] | None = None - optical_filter: str | None + optical_filter: str | None = None class Configuration(BaseModel): @@ -59,7 +59,7 @@ class Configuration(BaseModel): class ImagingScript(Script): """Default script for imaging configs.""" - configuration = Configuration + configuration: Configuration camera: str telescope: str | None = None @@ -75,14 +75,12 @@ class ImagingScript(Script): _object_name: str | None = PrivateAttr(default=None) - @model_validator(mode="after") - async def _get_proxies(self) -> Self: + async def _get_proxies(self) -> None: self._telescope = await self.comm.safe_proxy(self.telescope, ITelescope) self._camera = await self.comm.safe_proxy(self.camera, ICamera) self._filters = await self.comm.safe_proxy(self.filters, IFilters) self._autoguider = await self.comm.safe_proxy(self.autoguider, IAutoGuiding) self._acquisition = await self.comm.safe_proxy(self.acquisition, IAcquisition) - return self def _image_types(self) -> list[ImageType]: return list(set([instr.image_type for instr in self.configuration.instrument_configs])) @@ -104,6 +102,7 @@ async def can_run(self, data: TaskData | None) -> bool: Returns: True, if the script can run now """ + await self._get_proxies() # need camera if self._camera is None: @@ -118,7 +117,7 @@ async def can_run(self, data: TaskData | None) -> bool: return False # we probably need filters and autoguider/acquisition - if len(self._optical_filters()) > 0 or self._filters is None: + if len(self._optical_filters()) > 0 and self._filters is None: cannot_run_logger.warning("Cannot run task, No filter module found.") return False @@ -141,6 +140,7 @@ async def run(self, data: TaskData | None) -> None: Raises: InterruptedError: If interrupted """ + await self._get_proxies() # got a target? target = data.task.target if data is not None and data.task is not None else None @@ -152,18 +152,20 @@ async def run(self, data: TaskData | None) -> None: if isinstance(self._telescope, IPointingRaDec): if isinstance(target, SiderealTarget): track = asyncio.create_task(self._telescope.move_radec(target.ra, target.dec)) - raise exc.MotionError("Only sidereal targets allowed.") + else: + raise exc.MotionError("Only sidereal targets allowed.") else: raise exc.MotionError("Telescope can't move to RA/Dec.") # acquisition? if self.configuration.acquisition_config.enabled: + if self._acquisition is None: + raise ValueError("No acquisition given.") + # wait for track await track # do acquisition - if self._acquisition is None: - raise ValueError("No acquisition given.") log.info("Performing acquisition...") try: await self._acquisition.acquire_target() @@ -177,6 +179,11 @@ async def run(self, data: TaskData | None) -> None: if self.configuration.guiding_config.enabled: if self._autoguider is None: raise ValueError("No autoguider given.") + + # wait for track + await track + + # start auto-guiding log.info("Starting auto-guiding...") await self._autoguider.start() @@ -238,7 +245,10 @@ async def run(self, data: TaskData | None) -> None: await self._autoguider.stop() # finally, stop telescope - if await cast(ITelescope, self._telescope).get_motion_status() != MotionStatus.IDLE: + if ( + self._telescope is not None + and await cast(ITelescope, self._telescope).get_motion_status() != MotionStatus.IDLE + ): log.info("Stopping telescope...") await cast(ITelescope, self._telescope).stop_motion() From cc41fbe253c17e967af568c56e9080542d13aa6d Mon Sep 17 00:00:00 2001 From: Tim-Oliver Husser Date: Fri, 29 May 2026 21:21:30 +0200 Subject: [PATCH 4/6] fixed some bugs --- pyobs/robotic/scripts/imaging.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/pyobs/robotic/scripts/imaging.py b/pyobs/robotic/scripts/imaging.py index 4477af1d..29467e66 100644 --- a/pyobs/robotic/scripts/imaging.py +++ b/pyobs/robotic/scripts/imaging.py @@ -1,7 +1,7 @@ import asyncio import logging -from pydantic import model_validator, PrivateAttr, BaseModel, Field -from typing import Any, cast, Self +from pydantic import PrivateAttr, BaseModel, Field +from typing import Any, cast from pyobs.interfaces import ( IAutoGuiding, @@ -50,8 +50,8 @@ class InstrumentConfig(BaseModel): class Configuration(BaseModel): - acquisition_config: AcquisitionConfig - guiding_config: GuidingConfig + acquisition_config: AcquisitionConfig = Field(default_factory=AcquisitionConfig) + guiding_config: GuidingConfig = Field(default_factory=GuidingConfig) instrument_configs: list[InstrumentConfig] = Field(default_factory=list) repeats: int = 1 @@ -67,11 +67,11 @@ class ImagingScript(Script): autoguider: str | None = None acquisition: str | None = None - _telescope: ITelescope | None = PrivateAttr() - _camera: ICamera | None = PrivateAttr() - _filters: IFilters | None = PrivateAttr() - _autoguider: IAutoGuiding | None = PrivateAttr() - _acquisition: IAcquisition | None = PrivateAttr() + _telescope: ITelescope | None = PrivateAttr(default=None) + _camera: ICamera | None = PrivateAttr(default=None) + _filters: IFilters | None = PrivateAttr(default=None) + _autoguider: IAutoGuiding | None = PrivateAttr(default=None) + _acquisition: IAcquisition | None = PrivateAttr(default=None) _object_name: str | None = PrivateAttr(default=None) @@ -140,7 +140,6 @@ async def run(self, data: TaskData | None) -> None: Raises: InterruptedError: If interrupted """ - await self._get_proxies() # got a target? target = data.task.target if data is not None and data.task is not None else None From 21e2cd5b6586eec0219f407eb9129d54aa3b3663 Mon Sep 17 00:00:00 2001 From: Tim-Oliver Husser Date: Fri, 29 May 2026 21:24:51 +0200 Subject: [PATCH 5/6] fixed some bugs --- pyobs/robotic/scripts/imaging.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyobs/robotic/scripts/imaging.py b/pyobs/robotic/scripts/imaging.py index 29467e66..36641f7d 100644 --- a/pyobs/robotic/scripts/imaging.py +++ b/pyobs/robotic/scripts/imaging.py @@ -140,6 +140,8 @@ async def run(self, data: TaskData | None) -> None: Raises: InterruptedError: If interrupted """ + if self._camera is None: + await self._get_proxies() # got a target? target = data.task.target if data is not None and data.task is not None else None From 62926e7f4075b92a6b44cba9cefa51becb8ce30a Mon Sep 17 00:00:00 2001 From: Tim-Oliver Husser Date: Fri, 29 May 2026 21:27:11 +0200 Subject: [PATCH 6/6] v1.45.0 --- pyproject.toml | 2 +- uv.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e2b0a647..f29406f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "pyobs-core" -version = "1.44.10" +version = "1.45.0" description = "robotic telescope software" authors = [{ name = "Tim-Oliver Husser", email = "thusser@uni-goettingen.de" }] requires-python = ">=3.11" diff --git a/uv.lock b/uv.lock index 3a7d2e76..262cb0be 100644 --- a/uv.lock +++ b/uv.lock @@ -3120,7 +3120,7 @@ wheels = [ [[package]] name = "pyobs-core" -version = "1.44.10" +version = "1.45.0" source = { editable = "." } dependencies = [ { name = "aiohttp" },