Source code for panoptes.pocs.mount.simulator

"""Simple in-memory mount simulator for development and tests.

Implements the AbstractMount interface with timed state changes and canned
responses so higher-level logic can be exercised without hardware.
"""

import time
from threading import Timer

from astropy import units as u

from panoptes.utils import error
from panoptes.utils.time import current_time

from panoptes.pocs.mount import AbstractMount


[docs] class Mount(AbstractMount): """Mount class for a simulator. Use this when you don't actually have a mount attached.""" def __init__(self, location, *args, **kwargs): super().__init__(location, *args, **kwargs) self.logger.info("Using simulator mount") self._loop_delay = self.get_config("loop_delay", default=0.01) self.logger.debug("Simulator mount created")
[docs] def initialize(self, unpark=False, *arg, **kwargs): """Initialize the connection with the mount and setup for location. iOptron mounts are initialized by sending the following two commands to the mount:e * Version * MountInfo If the mount is successfully initialized, the `_setup_location_for_mount` method is also called. Returns: bool: Returns the value from `self._is_initialized`. """ self.logger.debug("Initializing simulator mount") if not self.is_connected: self.connect() self._is_initialized = True self._setup_location_for_mount() if unpark: self.unpark() return self._is_initialized
[docs] def connect(self): """Connect to the simulated mount (instant success).""" self.logger.debug("Connecting to mount simulator") self._is_connected = True return True
[docs] def disconnect(self): """Disconnect from the simulated mount (instant success).""" self.logger.debug("Disconnecting mount simulator") self._is_connected = False return True
def _update_status(self): self.logger.debug("Getting mount simulator status") status = dict() status["timestamp"] = current_time().isot status["tracking_rate_ra"] = self.tracking_rate status["state"] = self.state return status
[docs] def move_direction(self, direction="north", seconds=1.0): """Move mount in specified `direction` for given amount of `seconds`""" self.logger.debug(f"Mount simulator moving {direction} for {seconds} seconds") time.sleep(seconds)
[docs] def get_ms_offset(self, offset, axis="ra"): """Fake offset in milliseconds Args: offset (astropy.units.Angle): Offset in arcseconds Returns: astropy.units.Quantity: Offset in milliseconds at current speed """ offset = 25 * u.arcsecond # Fake value return super().get_ms_offset(offset, axis=axis)
[docs] def slew_to_target(self, slew_delay=0.5, *args, **kwargs): """Simulate slewing to the current target, then begin tracking. Args: slew_delay (float): Seconds to wait before setting tracking True. *args, **kwargs: Forwarded to AbstractMount.slew_to_target. Returns: bool: True if the superclass slew_to_target reports success. """ self._is_tracking = False # Set up a timer to trigger the `is_tracking` property. def trigger_tracking(): self.logger.debug("Triggering mount simulator tracking") self._is_tracking = True timer = Timer(slew_delay, trigger_tracking) timer.start() try: success = super().slew_to_target(*args, **kwargs) except error.Timeout: # Cancel the timer and re-throw exception timer.cancel() raise error.Timeout self._current_coordinates = self.get_target_coordinates() return success
[docs] def get_current_coordinates(self): """Return the simulator's current coordinates (SkyCoord or None).""" return self._current_coordinates
[docs] def stop_slew(self, next_position="is_tracking"): """Stop slewing and set the next simulated state flag. Args: next_position (str): One of 'is_tracking', 'is_home', etc.; the corresponding private flag will be set to True. """ self.logger.debug("Stopping slewing") # Set all to false then switch one below self._is_slewing = False self._is_tracking = False self._is_home = False # We actually set the hidden variable directly next_position = "_" + next_position if hasattr(self, next_position): self.logger.debug(f"Setting next position to {next_position}") setattr(self, next_position, True)
[docs] def slew_to_home(self, blocking=False, timeout=1): """Slews the mount to the home position. Note: Home position and Park position are not the same thing Returns: bool: indicating success """ self.logger.debug("Slewing to home") self._is_slewing = True self._is_tracking = False self._is_home = False self._is_parked = False self.stop_slew(next_position="is_home")
[docs] def park(self): """Sets the mount to park for simulator""" self.logger.debug("Setting to park") self._state = "Parked" self._is_slewing = False self._is_tracking = False self._is_home = False self._is_parked = True
[docs] def unpark(self): """Unpark the simulated mount (sets connected and clears parked).""" self.logger.debug("Unparking mount") self._is_connected = True self._is_parked = False return True
[docs] def query(self, cmd, params=None, **kwargs): """Simulate a mount command query. Args: cmd (str): Logical command name. params (str | dict | None): Optional parameters for the command. Returns: bool: Always True for the simulator; may sleep briefly for actions. """ self.logger.debug(f"Query cmd: {cmd} params: {params!r}") if cmd == "slew_to_target": time.sleep(self._loop_delay) return True
[docs] def write(self, cmd): """Simulate writing a command to the mount (logs only).""" self.logger.debug(f"Write: {cmd}")
[docs] def read(self, *args): """Simulate reading a response from the mount (logs only).""" self.logger.debug("Read")
[docs] def set_tracking_rate(self, direction="ra", delta=0.0): """Set a simulated custom tracking rate. Args: direction (str): Axis label ('ra' or 'dec'). delta (float): Offset multiple of sidereal rate. """ self.logger.debug(f"Setting tracking rate delta: {direction} {delta}") self.tracking = "Custom" self.tracking_rate = 1.0 + delta self.logger.debug("Custom tracking rate sent")
def _setup_location_for_mount(self): """Sets the mount up to the current location. Mount must be initialized first.""" assert self.is_initialized, self.logger.warning("Mount has not been initialized") assert self.location is not None, self.logger.warning("Please set a location before attempting setup") self.logger.debug("Setting up mount for location") def _mount_coord_to_skycoord(self, mount_coords): """Returns same coords""" return mount_coords def _skycoord_to_mount_coord(self, coords): """Returns same coords""" return [coords.ra, coords.dec] def _set_zero_position(self): """Sets the current position as the zero position.""" self.logger.debug("Simulator cannot set zero position") return False def _setup_commands(self, commands): self.commands = commands