Source code for panoptes.pocs.scheduler.observation.base

"""Observation base classes and types for the scheduler.

Defines the Exposure record and the Observation class used by the scheduler to
represent an observing block (target field, exposure timing/sets, and progress).
"""

import os
from collections import OrderedDict, defaultdict
from contextlib import suppress
from pathlib import Path

from astropy import units as u
from pydantic.dataclasses import dataclass

from panoptes.utils import error
from panoptes.utils.library import load_module
from panoptes.utils.utils import get_quantity_value, listify

from panoptes.pocs.base import PanBase
from panoptes.pocs.scheduler import create_constraints_from_config
from panoptes.pocs.scheduler.field import Field


[docs] @dataclass class Exposure: """Metadata for a single exposure produced during an observation.""" image_id: str path: Path metadata: dict is_primary: bool = False
[docs] class Observation(PanBase): """Represents a scheduled observing block for a specific field. Tracks exposure configuration (exptime, set size, counts), progress, and metadata such as priority and filter. Provides helpers for status, paths, and serialization used by the scheduler and cameras. """ def __init__( self, field: Field, exptime: float | u.Quantity | None = None, min_nexp: int = 60, exp_set_size: int = 10, priority: int | float = 100, filter_name: str | None = None, dark: bool = False, constraints: list | None = None, tags: list[str] | None = None, *args, **kwargs, ): """An observation of a given `panoptes.pocs.scheduler.field.Field`. An observation consists of a minimum number of exposures (`min_nexp`) that must be taken at a set exposure time (`exptime`). These exposures come in sets of a certain size (`exp_set_size`) where the minimum number of exposures must be an integer multiple of the set size. Note: An observation may consist of more exposures than `min_nexp` but exposures will always come in groups of `exp_set_size`. Decorators: u.quantity_input Arguments: field {`pocs.scheduler.field.Field`} -- An object representing the field to be captured Keyword Arguments: exptime {u.second | float | None} -- Exposure time for individual exposures (can be a float in seconds or an astropy Quantity with time units). If None, will use the value from config at 'cameras.defaults.exptime', or fall back to 120 seconds. (default: {None}) min_nexp {int} -- The minimum number of exposures to be taken for a given field (default: 60) exp_set_size {int} -- Number of exposures to take per set (default: {10}) priority {int} -- Overall priority for field, with 1.0 being highest (default: {100}) filter_name {str} -- Name of the filter to be used. If specified, will override the default filter name (default: {None}). dark (bool, optional): If True, exposures should be taken with the shutter closed. Default: False. constraints (list, optional): List of `Constraints` to apply to this observation. These constraints will be applied in addition to any global constraints. tags (list[str], optional): List of string tags to associate with this observation for metadata and searching purposes. Default: empty list. """ super().__init__(*args, **kwargs) # If exptime is None, get the default from camera config. if exptime is None: exptime = self.get_config("cameras.defaults.exptime", default=120) exptime = get_quantity_value(exptime, u.second) * u.second if not isinstance(field, Field): raise TypeError(f"field must be a valid Field instance, got {type(field)}.") if exptime < 0 * u.second: # 0 second exposures correspond to bias frames raise ValueError(f"Exposure time must be greater than or equal to 0, got {exptime}.") if not min_nexp % exp_set_size == 0: raise ValueError( f"Minimum number of exposures (min_nexp={min_nexp}) must be " f"a multiple of set size (exp_set_size={exp_set_size})." ) if not float(priority) > 0.0: raise ValueError("Priority must be larger than 0.") self.field = field self.dark = dark self._exptime = exptime self.min_nexp = min_nexp self.exp_set_size = exp_set_size self.exposure_list: dict[str, list[Exposure]] = defaultdict(list) self.pointing_images: dict[str, Path] = OrderedDict() self.priority = float(priority) self.filter_name = filter_name self._min_duration = self.exptime * self.min_nexp self._set_duration = self.exptime * self.exp_set_size self._image_dir = self.get_config("directories.images") self._directory = None self._seq_time = None self.merit = 0.0 self.constraints = constraints or [] self.tags = tags or [] self.reset() self.logger.debug(f"Observation created: {self}") ################################################################################################ # Properties ################################################################################################ @property def status(self) -> dict: """Observation status. Returns: dict: Dictionary containing current status of observation. """ status = { "current_exp": self.current_exp_num, "dec_mnt": get_quantity_value(self.field.coord.dec), "equinox": get_quantity_value(self.field.coord.equinox, unit="jyear_str"), "exp_set_size": self.exp_set_size, "exptime": get_quantity_value(self.exptime), "field_name": self.name, "merit": self.merit, "min_nexp": self.min_nexp, "minimum_duration": get_quantity_value(self.minimum_duration), "priority": self.priority, "ra_mnt": get_quantity_value(self.field.coord.ra), "seq_time": self.seq_time, "set_duration": get_quantity_value(self.set_duration), "dark": self.dark, "tags": self.tags, } return status @property def exptime(self): """Exposure time per image for this observation (Quantity in seconds).""" return self._exptime @exptime.setter def exptime(self, value): """Set the exposure time (Quantity or seconds).""" self._exptime = get_quantity_value(value, u.second) * u.second @property def exptimes(self): """Exposure time as a list.""" return listify(self.exptime) @property def minimum_duration(self): """Minimum amount of time to complete the observation""" return self._min_duration @property def set_duration(self): """Amount of time per set of exposures.""" return self._set_duration @property def name(self): """Name of the `~pocs.scheduler.field.Field` associated with the observation.""" return self.field.name @property def seq_time(self): """The time at which the observation was selected by the scheduler. This is used for path name construction. """ return self._seq_time @seq_time.setter def seq_time(self, time): """Set the scheduler selection time stamp (usually an ISO string).""" self._seq_time = time @property def directory(self) -> Path: """Return the directory for this Observation. This return the base directory for the Observation. This does *not* include the subfolders for each of the cameras. Returns: str: Full path to base directory. """ if self._directory is None: self._directory = os.path.join(self._image_dir, self.field.field_name) self.logger.info(f"Observation directory set to {self._directory}") return self._directory @property def current_exp_num(self) -> int: """Return the current number of exposures. Returns the maximum size of the exposure list from each camera. Returns: int: The size of `self.exposure_list`. """ try: return max([len(exposures) for exposures in self.exposure_list.values()]) except ValueError: return 0 @property def first_exposure(self) -> list[dict[str, Exposure]] | None: """Return the first exposure information. Returns: tuple: `image_id` and full path of the first exposure from the primary camera. """ return self.get_exposure(0) @property def last_exposure(self) -> list[dict[str, Exposure]] | None: """Return the latest exposure information. Returns: tuple: `image_id` and full path of most recent exposure from the primary camera """ return self.get_exposure(number=-1)
[docs] def get_exposure(self, number: int = 0) -> list[dict[str, Exposure]] | None: """Returns the given exposure number.""" exposure = list() try: if len(self.exposure_list) > 0: for cam_name, exposure_list in self.exposure_list.items(): with suppress(IndexError): exposure.append({cam_name: exposure_list[number]}) except Exception: self.logger.debug("No exposures available.") finally: return exposure
@property def pointing_image(self): """Return the last pointing image. Returns: tuple: `image_id` and full path of most recent pointing image from the primary camera. """ try: return list(self.pointing_images.items())[-1] except IndexError: self.logger.warning("No pointing image available") @property def set_is_finished(self): """Check if the current observing block has finished, which is True when the minimum number of exposures have been obtained and and integer number of sets have been completed. Returns: bool: True if finished, False if not. """ # Check the min required number of exposures have been obtained has_min_exposures = self.current_exp_num >= self.min_nexp # Check if the current set is finished this_set_finished = self.current_exp_num % self.exp_set_size == 0 return has_min_exposures and this_set_finished ################################################################################################ # Methods ################################################################################################
[docs] def add_to_exposure_list(self, cam_name: str, exposure: Exposure): """Add the exposure to the list and mark as most recent""" self.exposure_list[cam_name].append(exposure)
[docs] def reset(self): """Resets the exposure information for the observation""" self.logger.debug(f"Resetting observation {self}") self.exposure_list.clear() self.merit = 0.0 self.seq_time = None
[docs] def to_dict(self): """Serialize the object to a dict.""" return dict( field=self.field.to_dict(), exptime=get_quantity_value(self.exptime), min_nexp=self.min_nexp, exp_set_size=self.exp_set_size, priority=self.priority, filter_name=self.filter_name, dark=self.dark, tags=self.tags, )
################################################################################################ # Private Methods ################################################################################################ def __str__(self): return ( f"{self.field}: {self.exptime} exposures " f"in blocks of {self.exp_set_size}, " f"minimum {self.min_nexp}, " f"priority {self.priority:.0f}" ) def __repr__(self): return ( f"<Observation: {self.name} " f"exptime={self.exptime}, " f"min_nexp={self.min_nexp}, " f"exp_set_size={self.exp_set_size}, " f"priority={self.priority}, " f"constraints={self.constraints}>" ) ################################################################################################ # Class Methods ################################################################################################
[docs] @classmethod def from_dict( cls, observation_config: dict, field_class="panoptes.pocs.scheduler.field.Field", observation_class="panoptes.pocs.scheduler.observation.base.Observation", ): """Creates an `Observation` object from config dict. Args: observation_config (dict): Configuration for `Field` and `Observation`. field_class (str, optional): The full name of the python class to be used as default for the observation's Field. This can be overridden by specifying the "type" item under the observation_config's "field" key. Default: `panoptes.pocs.scheduler.field.Field`. observation_class (str, optional): The full name of the python class to be used as default for the observation object. This can be overridden by specifying the "type" item under the observation_config's "observation" key. Default: `panoptes.pocs.scheduler.observation.base.Observation`. """ observation_config = observation_config.copy() field_config = observation_config.get("field", {}).copy() field_type_name = field_config.pop("type", field_class) obs_config = observation_config.get("observation", {}).copy() obs_type_name = obs_config.pop("type", observation_class) try: # Make the field field = load_module(field_type_name)(**field_config) # If the observation has constraints, make those first. if "constraints" in obs_config: obs_config["constraints"] = create_constraints_from_config(obs_config) obs = load_module(obs_type_name)(field=field, **obs_config) except Exception as e: raise error.InvalidObservation(f"Invalid field: {observation_config!r} {e!r}") else: return obs