Source code for panoptes.pocs.mount.mount

"""Abstract base class and helpers for telescope mounts.

Provides AbstractMount, a hardware-agnostic base with common properties
(status, tracking, parking, slewing) and orchestration helpers (target
handling, coordinate conversion, command table mapping). Concrete mount
implementations (iOptron, Bisque, simulator, etc.) subclass this and
implement device-specific read/write/query and motion control.
"""

import time
from abc import abstractmethod
from pathlib import Path

from astropy import units as u
from astropy.coordinates import EarthLocation, SkyCoord

from panoptes.utils import error
from panoptes.utils.serializers import from_yaml
from panoptes.utils.time import CountdownTimer, current_time
from panoptes.utils.utils import get_quantity_value

from panoptes.pocs.base import PanBase


[docs] class AbstractMount(PanBase): """ Abstract Base class for controlling a mount. This provides the basic functionality for the mounts. Sub-classes should override the `initialize` method for mount-specific issues as well as any helper methods specific mounts might need. See "NotImplemented Methods" section of this module. Sets the following properties: - self.non_sidereal_available = False - self.PEC_available = False - self._is_initialized = False Args: config (dict): Custom configuration passed to base mount. This is usually read from the main system config. commands (dict): Commands for the telescope. These are read from a yaml file that maps the mount-specific commands to common commands. location (EarthLocation): An `astropy.coordinates.EarthLocation` that contains location information. """ def __init__(self, location, commands=None, *args, **kwargs): super().__init__(*args, **kwargs) assert isinstance(location, EarthLocation) # Create an object for just the mount config items self.mount_settings = self.get_config("mount.settings", dict()) self.logger.debug(f"Mount settings: {self.mount_settings}") # setup commands for mount self.logger.debug("Setting up commands for mount") self._setup_commands(commands) self.logger.debug("Mount commands set up") self._mount_version = None # Set the initial location self._location = location # Get mount settings from config. self.non_sidereal_available = self.mount_settings.setdefault("non_sidereal_available", False) self.PEC_available = self.mount_settings.setdefault("PEC_available", False) # Initial states self._is_connected = False self._is_initialized = False self._is_slewing = False self._is_parked = True self._at_mount_park = True self._is_tracking = False self._is_home = False self._state = "Parked" self.sidereal_rate = (360 * u.degree).to(u.arcsec) / (86164 * u.second) self.ra_guide_rate = 0.9 # Sidereal self.dec_guide_rate = 0.9 # Sidereal self._tracking_rate = 1.0 # Sidereal self._tracking = "Sidereal" self.min_tracking_threshold = self.mount_settings.setdefault("min_tracking_threshold", 100) # ms self.max_tracking_threshold = self.mount_settings.setdefault("max_tracking_threshold", 99999) # ms self._movement_speed = "" # Set initial coordinates self._target_coordinates = None self._current_coordinates = None self._park_coordinates = None self.brand = self.get_config("mount.brand", "") self.model = self.get_config("mount.model", "") self.port = self.get_config("mount.serial.port") def __str__(self): mount_str = f"{self.brand} {self.model}" if self.port is not None: mount_str = f"{mount_str} {self.port}" return mount_str
[docs] @abstractmethod def connect(self): """Connect to the mount.""" raise NotImplementedError
[docs] @abstractmethod def initialize(self, *arg, **kwargs): """Initialize the mount hardware and prepare for use. Subclasses should perform device-specific setup such as establishing communications, homing, and applying any required configuration so the mount is ready to slew and track. """ raise NotImplementedError
[docs] def disconnect(self): """Disconnect from the mount, parking if necessary. If the mount is not currently parked this will issue a park command before closing the connection, then mark the connection as closed. """ self.logger.info("Disconnecting mount") if not self.is_parked: self.park() self._is_connected = False
[docs] def update_status(self): """Thin-wrapper to call the status property.""" self.logger.debug(f"Updating mount status: {self.status}")
@property def status(self): """Return a snapshot of current mount parameters and positions. Returns: dict: Mapping of basic rates, movement speed, current coordinates (if available), and target coordinates (if set), augmented by subclass-specific fields from _update_status(). """ self.logger.trace("Getting mount status") status = {} try: status["tracking_rate"] = self.tracking_rate status["ra_guide_rate"] = self.ra_guide_rate status["dec_guide_rate"] = self.dec_guide_rate status["movement_speed"] = self.movement_speed current_coord = self.get_current_coordinates() self.logger.trace(f"{current_coord=}") if current_coord is not None: self.logger.trace("Getting current_ra and current_dec") status["current_ra"] = get_quantity_value(current_coord.ra, unit="degree") status["current_dec"] = get_quantity_value(current_coord.dec, unit="degree") if self.has_target: target_coord = self.get_target_coordinates() self.logger.trace(f"{target_coord=}") status["mount_target_ra"] = get_quantity_value(target_coord.ra, unit="degree") status["mount_target_dec"] = get_quantity_value(target_coord.dec, unit="degree") except Exception as e: self.logger.trace(f"Problem getting mount status: {e!r}") self.logger.trace("Updating status with _update_status") status.update(self._update_status()) return status @property def location(self): """astropy.coordinates.SkyCoord: The location details for the mount. When a new location is set,`_setup_location_for_mount` is called, which will update the mount with the current location. It is anticipated the mount won't change locations while observing so this should only be done upon mount initialization. """ return self._location @location.setter def location(self, location): """Update the mount location and propagate to the hardware. Args: location (astropy.coordinates.EarthLocation): New EarthLocation to use. """ self._location = location # If the location changes we need to update the mount self._setup_location_for_mount() @property def is_connected(self): """bool: Checks the serial connection on the mount to determine if connection is open""" return self._is_connected @property def is_initialized(self): """bool: Has mount been initialised with connection""" return self._is_initialized @property def is_parked(self): """bool: Mount parked status.""" return self._is_parked @property def at_mount_park(self): """bool: True if mount is at park position.""" return self._at_mount_park @property def is_home(self): """bool: Mount home status.""" return self._is_home @property def is_tracking(self): """bool: Mount tracking status.""" return self._is_tracking @property def is_slewing(self): """bool: Mount slewing status.""" return self._is_slewing @property def state(self): """bool: Mount state.""" return self._state @property def movement_speed(self): """bool: Movement speed when button pressed.""" return self._movement_speed @property def has_target(self): """Whether a target coordinate has been set for the mount. Returns: bool: True if set_target_coordinates has been called successfully. """ return self._target_coordinates is not None @property def tracking_rate(self): """bool: Mount tracking rate""" return self._tracking_rate @tracking_rate.setter def tracking_rate(self, value): """Set the tracking rate""" self._tracking_rate = value @property def mount_version(self): """str: Mount version""" return self._mount_version
[docs] def get_target_coordinates(self): """Gets the RA and Dec for the mount's current target. This does NOT necessarily reflect the current position of the mount, see `get_current_coordinates`. Returns: astropy.coordinates.SkyCoord: """ return self._target_coordinates
[docs] def set_target_coordinates(self, coords): """Sets the RA and Dec for the mount's current target. Args: coords (astropy.coordinates.SkyCoord): coordinates specifying target location Returns: bool: Boolean indicating success """ target_set = False # Save the skycoord coordinates self.logger.debug(f"Setting target coordinates: {coords}") self._target_coordinates = coords # Get coordinate format from mount specific class mount_coords = self._skycoord_to_mount_coord(self._target_coordinates) # Send coordinates to mount try: self.query("set_ra", mount_coords[0]) self.query("set_dec", mount_coords[1]) target_set = True except Exception as e: self.logger.warning(f"Problem setting mount coordinates: {mount_coords} {e!r}") self.logger.debug(f"Mount set target coordinates: {target_set}") return target_set
[docs] def get_current_coordinates(self): """Reads out the current coordinates from the mount. Note: See `_mount_coord_to_skycoord` and `_skycoord_to_mount_coord` for translation of mount specific coordinates to astropy.coordinates.SkyCoord Returns: astropy.coordinates.SkyCoord """ mount_coords = self.query("get_coordinates") # Turn the mount coordinates into a SkyCoord self._current_coordinates = self._mount_coord_to_skycoord(mount_coords) return self._current_coordinates
[docs] def distance_from_target(self): """Get current distance from target Returns: u.Angle: An angle represeting the current on-sky separation from the target """ target = self.get_target_coordinates().coord separation = self.get_current_coordinates().separation(target, origin_mismatch="ignore") self.logger.debug(f"Current separation from target: {separation}") return separation
[docs] def get_tracking_correction( self, offset_info, pointing_ha, min_tracking_threshold=None, max_tracking_threshold=None ): """Determine the needed tracking corrections from current position. This method will determine the direction and number of milliseconds to correct the mount for each axis in order to correct for any tracking drift. The Declination axis correction ('north' or 'south') depends on the movement of the camera box with respect to the pier, which can be determined from the Hour Angle (HA) of the pointing image in the sequence. Note: Correction values below 50ms will be skipped and values above 99999ms will be clipped. Args: offset_info (`OffsetError`): A named tuple describing the offset error. See `pocs.images.OffsetError`. pointing_ha (float): The Hour Angle (HA) of the mount at the beginning of the observation sequence in degrees. This affects the direction of the Dec adjustment. min_tracking_threshold (int, optional): Minimum size of tracking correction allowed in milliseconds. Tracking corrections lower than this are ignored. Default 100ms from `self.min_tracking_threshold`. max_tracking_threshold (int, optional): Maximum size of tracking correction allowed in milliseconds. Tracking corrections higher than this are set to this value. Default 99999ms from `self.max_tracking_threshold`. Returns: dict: Offset corrections for each axis as needed. Example: { # axis: (arcsec, millisecond, direction) 'ra': (float, float, str), 'dec': (float, float, str), } """ pier_side = "east" if 0 <= pointing_ha <= 12: pier_side = "west" self.logger.debug(f"Mount pier side: {pier_side} {pointing_ha:.02f}") if min_tracking_threshold is None: min_tracking_threshold = self.min_tracking_threshold if max_tracking_threshold is None: max_tracking_threshold = self.max_tracking_threshold axis_corrections = { "dec": None, "ra": None, } for axis in axis_corrections.keys(): # find the number of ms and direction for Dec axis offset = getattr(offset_info, f"delta_{axis}") offset_ms = self.get_ms_offset(offset, axis=axis) if axis == "dec": # Determine which direction to move based on direction mount # is moving (i.e. what side it started on). if pier_side == "east": if offset_ms >= 0: delta_direction = "north" else: delta_direction = "south" else: if offset_ms >= 0: delta_direction = "south" else: delta_direction = "north" else: if offset_ms >= 0: delta_direction = "west" else: delta_direction = "east" offset_ms = abs(offset_ms.value) self.logger.debug(f"Tracking offset: {offset_ms} ms") # Skip short corrections if offset_ms <= min_tracking_threshold: self.logger.debug(f"Min tracking threshold: {min_tracking_threshold} ms") self.logger.debug("Requested tracking lower than threshold, skipping correction") continue # Correct long offset if offset_ms > max_tracking_threshold: self.logger.debug(f"Max tracking threshold: {max_tracking_threshold} ms") self.logger.debug("Requested tracking higher than threshold, setting to threshold") offset_ms = max_tracking_threshold self.logger.debug(f"{axis}: {delta_direction} {offset_ms:.02f} ms") axis_corrections[axis] = (offset, offset_ms, delta_direction) return axis_corrections
[docs] def correct_tracking(self, correction_info, axis_timeout=30.0): """Make tracking adjustment corrections. Args: correction_info (dict[tuple]): Correction info to be applied, see `get_tracking_correction`. axis_timeout (float, optional): Timeout for adjustment in each axis, default 30 seconds. Raises: `error.Timeout`: Timeout error. """ for axis, corrections in correction_info.items(): if not axis or not corrections: continue offset = corrections[0] offset_ms = corrections[1] delta_direction = corrections[2] self.logger.info(f"Adjusting {axis}: {delta_direction} {offset_ms:0.2f} ms {offset:0.2f}") self.query(f"move_ms_{delta_direction}", f"{offset_ms:05.0f}") # Adjust tracking for `axis_timeout` seconds then fail if not done. start_tracking_time = current_time() while self.is_tracking is False: if (current_time() - start_tracking_time).sec > axis_timeout: raise error.Timeout(f"Tracking adjustment timeout: {axis}") self.logger.debug(f"Waiting for {axis} tracking adjustment") time.sleep(0.5)
[docs] def slew_to_coordinates(self, coords, *args, **kwargs): """Slews to given coordinates. Args: coords (astropy.SkyCoord): The coordinates to slew to. Returns: bool: indicating success """ if not isinstance(coords, SkyCoord): raise TypeError( "coords should be an instance of astropy.coordinates.SkyCoord, got {type(coords)}." ) response = 0 if not self.is_parked: # Set the coordinates if self.set_target_coordinates(coords): response = self.slew_to_target(*args, **kwargs) else: self.logger.warning("Could not set target_coordinates") return response
[docs] def home_and_park(self, *args, **kwargs): """Convenience method to first slew to the home position and then park.""" if not self.is_parked: self.slew_to_home(blocking=True) # Reinitialize from home seems to always do the trick of getting us to # correct side of pier for parking self._is_initialized = False self.initialize() self.park(*args, **kwargs) while self.is_slewing and not self.is_parked: time.sleep(5) self.logger.debug("Slewing to park, sleeping for 5 seconds") self.logger.debug("Mount parked")
[docs] def slew_to_target(self, blocking: bool = False, timeout: int | float = 180) -> bool: """Slews to the currently assigned target coordinates. Slews the mount to the coordinates that have been assigned by `~set_target_coordinates`. If no coordinates have been set, do nothing and return `False`, otherwise return response from the mount. If `blocking=True` then wait for up to `timeout` seconds for the mount to reach the `is_tracking` state. If a timeout occurs, raise a `pocs.error.Timeout` exception. Args: blocking (bool, optional): If command should block while slewing to home, default False. timeout (int, optional): Maximum time spent slewing to home, default 180 seconds. Returns: bool: indicating success """ success = False if self.is_parked: self.logger.info("Mount is parked") elif not self.has_target: self.logger.info("Target Coordinates not set") else: self.logger.debug("Slewing to target") success = bool(self.query("slew_to_target")) self.logger.debug(f"Mount response: {success}") if success: if blocking: # Set up the timeout timer self.logger.debug(f"Setting slew timeout timer for {timeout} sec") timeout_timer = CountdownTimer(timeout, name="SlewToTarget") block_time = 3 # seconds # Check the status, which updates the tracking status. self.update_status() while self.is_tracking is False: if timeout_timer.expired(): self.logger.warning(f"slew_to_target timout: {timeout} seconds") raise error.Timeout("Problem slewing to target") self.logger.debug(f"Slewing to target, sleeping for {block_time} seconds") timeout_timer.sleep(max_sleep=block_time) # Check the status, which updates the tracking status. self.update_status() self.logger.debug("Done with slew_to_target block") else: self.logger.warning("Problem with slew_to_target") return success
[docs] def slew_to_home(self, blocking=True, timeout=180): """Slews the mount to the home position. Note: Home position and Park position are not the same thing Args: blocking (bool, optional): If command should block while slewing to home, default True. timeout (int, optional): Maximum time spent slewing to home, default 180 seconds. Returns: bool: indicating success """ response = 0 # Set up the timeout timer timeout_timer = CountdownTimer(timeout) block_time = 3 # seconds if not self.is_parked: # Reset target coordinates self._target_coordinates = None # Start the slew response = self.query("slew_to_home") if response and blocking: while self.is_home is False: if timeout_timer.expired(): self.logger.warning(f"slew_to_home timout: {timeout} seconds") response = 0 break self.logger.debug(f"Slewing to home, sleeping for {block_time} seconds") timeout_timer.sleep(max_sleep=block_time) self.update_status() else: self.logger.info("Mount is parked") return response
[docs] def slew_to_zero(self, blocking=False): """Calls `slew_to_home` in base class. Can be overridden.""" self.slew_to_home(blocking=blocking)
[docs] def park(self, *args, **kwargs): """Slews to the park position and parks the mount. The park position must be set manually first for this method to work. Most mount subclasses will override this method to provide mount-specific park functionality. Note: When mount is parked no movement commands will be accepted. Returns: bool: indicating success """ response = self.query("park") if response: self.logger.debug("Slewing to park") else: self.logger.warning("Problem with slew_to_park") while not self.is_parked: self._update_status() time.sleep(2) self._is_parked = True return response
[docs] def unpark(self): """Unparks the mount. Does not do any movement commands but makes them available again. Returns: bool: indicating success """ response = self.query("unpark") if response: self._is_parked = False self._at_mount_park = False self.logger.debug("Mount unparked") else: self.logger.warning("Problem with unpark") return response
[docs] def move_direction(self, direction="north", seconds=1.0): """Move the mount in a specified direction for a duration. Args: direction (str): One of {"north", "south", "east", "west"} indicating the direction to move. seconds (float): Duration in seconds to move in the given direction. Raises: AssertionError: If the provided direction is not valid. Exception: Propagates any low-level I/O exceptions encountered while issuing movement commands to the mount; movement is stopped in the finally block regardless. """ seconds = float(seconds) assert direction in ["north", "south", "east", "west"] short_direction = direction[0].lower() # Use low-level commands without a lookup to make timing exact. move_command = f":m{short_direction}#" stop_command = ":Q#" try: self.logger.debug(f"Moving {direction} for {seconds} seconds. ") t0 = time.perf_counter() self.write(move_command) time.sleep(seconds) self.write(stop_command) t1 = time.perf_counter() self.logger.debug(f"{(t1 - t0):.02f} seconds passed total") except KeyboardInterrupt: self.logger.warning("Keyboard interrupt, stopping movement.") except Exception as e: self.logger.warning(f"Problem moving mount! Make sure mount has stopped moving: {e!r}") finally: # Note: We do this twice. That's fine. self.logger.debug("Stopping movement") self.query("stop_moving")
[docs] def get_ms_offset(self, offset, axis="ra"): """Get offset in milliseconds at current speed Args: offset (astropy.units.Angle): Offset in arcseconds axis (str): The name of the axis to move, default 'ra'. Returns: astropy.units.Quantity: Offset in milliseconds at current speed """ rates = { "ra": self.ra_guide_rate, "dec": self.dec_guide_rate, } guide_rate = rates[axis] return (offset / (self.sidereal_rate * guide_rate)).to(u.ms)
[docs] def query(self, cmd, params=None, **kwargs): """Send a command to the mount and return the response. The command key is translated to a mount-specific serial command (using the loaded commands YAML), written to the connection, and the response is read back. Args: cmd (str): Logical command name defined in the mount commands YAML file. params (str | None): Optional parameter string to include with the command. **kwargs: Additional keyword arguments forwarded to the low-level `read()` call. Returns: str: The raw response from the mount. Raises: AssertionError: If the mount has not been initialized. error.InvalidMountCommand: If the command key is not known. Examples: >>> from panoptes.pocs.mount import create_mount_from_config # doctest: +SKIP >>> mount = create_mount_from_config() # doctest: +SKIP >>> mount.query('set_local_time', '101503') # doctest: +SKIP '1' >>> mount.query('get_local_time') # doctest: +SKIP '101503' """ assert self.is_initialized, self.logger.warning("Mount has not been initialized") full_command = self._get_command(cmd, params=params) self.write(full_command) response = self.read(**kwargs) return response
def _get_expected_response(self, cmd): """Looks up appropriate response for command for telescope""" # Get the actual command cmd_info = self.commands.get(cmd) if cmd_info is not None: response = cmd_info.get("response") self.logger.trace(f"Mount Command Response: {response}") else: raise error.InvalidMountCommand(f"No result for command {cmd}") return response def _setup_commands(self, commands): """ Does any setup for the commands needed for this mount. Mostly responsible for setting the pre- and post-commands. We could also do some basic checking here to make sure required commands are in fact available. """ commands = commands or dict() self.logger.debug("Setting up commands for mount.") if len(commands) == 0: base_dir = Path(self.get_config("directories.base")) mount_dir = base_dir / self.get_config("directories.mounts") commands_file = self.get_config("mount.commands_file") if commands_file is None: self.logger.debug('No "commands_file" key found, attempting to use brand and model') brand = self.get_config("mount.brand") model = self.get_config("mount.model") commands_file = f"{brand}/{model}" commands_file = Path(f"{mount_dir}/{commands_file}.yaml") self.logger.info(f"Loading mount commands file: {commands_file}") try: with commands_file.open() as f: commands.update(from_yaml(f.read(), parse=False)) self.logger.debug(f"Mount commands updated from {commands_file}") except Exception as err: self.logger.warning(f"Error loading {commands_file=} {err!r}") # Get the pre- and post- commands self._pre_cmd = commands.setdefault("cmd_pre", ":") self._post_cmd = commands.setdefault("cmd_post", "#") self.commands = commands self.logger.debug("Mount commands set up")
[docs] def search_for_home(self): """Search for the home position if supported.""" self.logger.warning( "Searching for home position not supported." "Please set the home position manually via the hand-controller." )
[docs] @abstractmethod def write(self, cmd): """Low-level write to the mount connection. Args: cmd (str): Fully formatted command string to send to the device. """ raise NotImplementedError
[docs] @abstractmethod def read(self, *args, **kwargs): """Low-level read from the mount connection. Args: *args: Transport-specific positional options (e.g., size). **kwargs: Transport-specific keyword options (e.g., timeout). Returns: str: Raw response string from the device. """ raise NotImplementedError
[docs] @abstractmethod def set_tracking_rate(self, direction="ra", delta=1.0): """Sets the tracking rate for the mount""" raise NotImplementedError
@abstractmethod def _setup_location_for_mount(self): """Sets the current location details for the mount.""" raise NotImplementedError @abstractmethod def _set_zero_position(self): """Sets the current position as the zero (home) position.""" raise NotImplementedError @abstractmethod def _get_command(self, cmd, params=None): raise NotImplementedError @abstractmethod def _mount_coord_to_skycoord(self, coords_str): raise NotImplementedError @abstractmethod def _skycoord_to_mount_coord(self, coords): raise NotImplementedError @abstractmethod def _update_status(self): raise NotImplementedError