Source code for panoptes.pocs.scheduler.scheduler

"""Base scheduler primitives and common helpers.

Defines BaseScheduler, an abstract class that loads field definitions
and manages the list of Observation objects, current selection, and common
properties used during scheduling. Concrete schedulers subclass this and
implement get_observation().
"""

import os
from abc import abstractmethod
from collections import OrderedDict
from contextlib import suppress

from astroplan import Observer
from astropy import units as u
from astropy.coordinates import get_body

from panoptes.utils import error
from panoptes.utils.serializers import from_yaml
from panoptes.utils.time import current_time

from panoptes.pocs.base import PanBase
from panoptes.pocs.scheduler.observation.base import Observation


[docs] class BaseScheduler(PanBase): """Abstract base class for schedulers. Loads fields and constraints, manages observations and the current selection, and provides helpers used by concrete schedulers. """ def __init__(self, observer, fields_list=None, fields_file=None, constraints=None, *args, **kwargs): """Loads `~pocs.scheduler.field.Field`s from a field. Note: `~pocs.scheduler.field.Field` configurations passed via the `fields_list` will not be saved but will instead be turned into `~pocs.scheduler.observation.Observations`. Further `Observations` should be added directly via the `add_observation` method. Args: observer (`astroplan.Observer`): The physical location the scheduling will take place from. fields_list (list, optional): A list of valid target configurations. fields_file (str): YAML file containing field parameters. constraints (list, optional): List of `Constraints` to apply to each observation. *args: Arguments to be passed to `PanBase` **kwargs: Keyword args to be passed to `PanBase` """ super().__init__(*args, **kwargs) assert isinstance(observer, Observer) self._observations = dict() self._current_observation = None self._fields_list = fields_list # Use the setter, which will force a file read. self.fields_file = fields_file self.observer = observer self.constraints = constraints or list() self.observed_list = OrderedDict() if self.get_config("scheduler.check_file", default=True): self.logger.debug("Reading fields list.") self.read_field_list() # Items common to each observation that shouldn't be computed each time. self.common_properties = None @property def status(self): """dict: Summary of constraints and current observation.""" return { "constraints": self.constraints, "current_observation": self.current_observation, } @property def observations(self): """Returns a dict of `~pocs.scheduler.observation.Observation` objects with `~pocs.scheduler.observation.Observation.field.field_name` as the key Note: `read_field_list` is called if list is None """ if self.has_valid_observations is False: self.read_field_list() return self._observations @property def has_valid_observations(self): """bool: True if one or more observations are currently available.""" return len(self._observations.keys()) > 0 @property def current_observation(self): """The observation that is currently selected by the scheduler Upon setting a new observation the `seq_time` is set to the current time and added to the `observed_list`. An old observation is reset (so that it can be used again - see `~pocs.scheduelr.observation.reset`). If the new observation is the same as the old observation, nothing is done. The new observation can also be set to `None` to specify there is no current observation. """ return self._current_observation @current_observation.setter def current_observation(self, new_observation): """Update the current observation and record sequencing metadata. Args: new_observation (Observation | None): New observation to select, or None to clear and reset the prior current observation. """ if self.current_observation is None: # If we have no current observation but do have a new one, set seq_time # and add to the list if new_observation is not None: # Set the new seq_time for the observation new_observation.seq_time = current_time(flatten=True) # Add the new observation to the list self.observed_list[new_observation.seq_time] = new_observation else: # If no new observation, simply reset the current if new_observation is None: self.current_observation.reset() else: # If we have a new observation, check if same as old observation if self.current_observation.name != new_observation.name: self.current_observation.reset() new_observation.seq_time = current_time(flatten=True) # Add the new observation to the list self.observed_list[new_observation.seq_time] = new_observation self.logger.info(f"Setting new observation to {new_observation}") self._current_observation = new_observation @property def fields_file(self): """Field configuration file A YAML list of config items, specifying a minimum of `name` and `position` for the `~pocs.scheduler.field.Field`. `Observation`s will be built from the list of fields. A file will be read by `~pocs.scheduler.priority.read_field_list` upon being set. Note: Setting a new `fields_file` will clear all existing fields """ return self._fields_file @fields_file.setter def fields_file(self, new_file): """Set the path to the fields YAML and reload observations. Args: new_file (str | os.PathLike | None): Path to a YAML file describing fields; if None, keeps the in-memory fields_list. """ self.clear_available_observations() self._fields_file = new_file self.read_field_list() @property def fields_list(self): """List of field configuration items A YAML list of config items, specifying a minimum of `name` and `position` for the `~pocs.scheduler.field.Field`. `Observation`s will be built from the list of fields. A file will be read by `~pocs.scheduler.priority.read_field_list` upon being set. Note: Setting a new `fields_list` will clear all existing fields """ return self._fields_list @fields_list.setter def fields_list(self, new_list): """Replace the in-memory list of field configs and rebuild observations. Args: new_list (list[dict] | None): Sequence of field configuration dicts. """ self.clear_available_observations() self._fields_list = new_list self.read_field_list()
[docs] @abstractmethod def get_observation(self, *args, **kwargs): """Get a valid observation.""" raise NotImplementedError
[docs] def clear_available_observations(self): """Reset the list of available observations""" # Clear out existing list and observations self.current_observation = None self._observations = dict()
[docs] def reset_observed_list(self): """Reset the observed list""" self.logger.debug("Resetting observed list") self.observed_list = OrderedDict()
[docs] def observation_available(self, observation, time): """Check if observation is available at given time Args: observation (pocs.scheduler.observation): An Observation object time (astropy.time.Time): The time at which to check observation """ return self.observer.target_is_up(time, observation.field, horizon=30 * u.degree)
[docs] def add_observation(self, observation_config: dict, **kwargs): """Adds an `Observation` to the scheduler. Args: observation_config (dict): Configuration dict for `Field` and `Observation`. """ try: obs = Observation.from_dict(observation_config, **kwargs) self.logger.debug(f"Observation created: {obs!r}") # Add observation to scheduler. if obs.name in self._observations: self.logger.debug(f"Overriding existing entry for {obs.name=!r}") self._observations[obs.name] = obs self.logger.debug(f"{obs!r} added to {self}.") except Exception as e: raise error.InvalidObservation(f"Invalid field: {observation_config!r} {e!r}")
[docs] def remove_observation(self, field_name): """Removes an `Observation` from the scheduler Args: field_name (str): Field name corresponding to entry key in `observations` """ with suppress(Exception): obs = self._observations[field_name] del self._observations[field_name] self.logger.debug(f"Observation removed: {obs}")
[docs] def read_field_list(self): """Reads the field file and creates valid `Observations`.""" self.logger.debug(f"Reading fields from file: {self.fields_file}") if self._fields_file is not None: if not os.path.exists(self.fields_file): raise FileNotFoundError with open(self.fields_file) as f: self._fields_list = from_yaml(f.read()) if self._fields_list is not None: for observation_config in self._fields_list: try: self.add_observation(observation_config) except Exception as e: self.logger.warning(f"Error adding observation: {e!r}")
[docs] def set_common_properties(self, time): """Sets some properties common to all observations, such as end of night, moon, etc.""" horizon_limit = self.get_config("location.observe_horizon", default=-18 * u.degree) self.common_properties = { "end_of_night": self.observer.tonight(time=time, horizon=horizon_limit)[-1], "moon": get_body("moon", time, self.observer.location), "observed_list": self.observed_list, }