Source code for panoptes.pocs.mount.bisque

"""Bisque/TheSkyX-based mount driver.

Implements a mount driver that communicates with Software Bisque's TheSkyX via
its TCP scripting interface, using small JavaScript templates for commands.
"""

import json
import os
import time
from string import Template
from threading import Lock

from astropy import units as u
from astropy.coordinates import SkyCoord

from panoptes.utils import error

from panoptes.pocs.mount import AbstractMount
from panoptes.pocs.utils import theskyx


[docs] class Mount(AbstractMount): """Mount driver that proxies motions and queries to TheSkyX. Uses small JavaScript template files to issue commands over TheSkyX's TCP interface and parses JSON-like responses to update status and execute slews. """ def __init__(self, *args, **kwargs): """Initialize the Bisque mount driver and connect to TheSkyX client.""" super().__init__(*args, **kwargs) self.theskyx = theskyx.TheSkyX() self._command_lock = Lock() template_dir = self.get_config("mount.template_dir") if template_dir.startswith("/") is False: template_dir = os.path.join(os.environ["POCS"], template_dir) assert os.path.exists(template_dir), self.logger.warning( "Bisque Mounts required a template directory" ) self.template_dir = template_dir ########################################################################## # Properties ########################################################################## @property def is_parked(self): """bool: Mount parked status.""" self._update_status() return self._is_parked @property def is_home(self): """bool: Mount home status.""" self._update_status() return self._is_home @property def is_tracking(self): """bool: Mount tracking status.""" self._update_status() return self._is_tracking @property def is_slewing(self): """bool: Mount slewing status.""" self._update_status() return self._is_slewing @property def at_mount_park(self): """bool: Mount slewing status.""" self._update_status() return self._at_mount_park ########################################################################## # Methods ##########################################################################
[docs] def connect(self): """Connects to the mount via the serial port (`self._port`) Returns: bool: Returns the self.is_connected property which checks the actual serial connection. """ self.logger.info("Connecting to mount") self.write(self._get_command("connect")) response = self.read() self._is_connected = response["success"] try: self.logger.info(response["msg"]) except KeyError: pass return self.is_connected
[docs] def disconnect(self): """Disconnect from TheSkyX and mark connection closed. Returns: bool: True if now disconnected. """ self.logger.debug("Disconnecting mount from TheSkyX") self.query("disconnect") self._is_connected = False return not self.is_connected
[docs] def initialize(self, unpark=False, *args, **kwargs): """Initialize the connection with the mount and setup for location. If the mount is successfully initialized, the `_setup_location_for_mount` method is also called. Returns: bool: Returns the value from `self.is_initialized`. """ if not self.is_connected: self.connect() if self.is_connected and not self.is_initialized: self.logger.info(f"Initializing {__name__} mount") # We trick the mount into thinking it's initialized while we # initialize otherwise the `serial_query` method will test # to see if initialized and be put into loop. self._is_initialized = True self._setup_location_for_mount() if unpark: self.unpark() self.logger.info(f"Mount initialized: {self.is_initialized}") return self.is_initialized
[docs] def query(self, *args, **kwargs): """Override the query method to use the command lock. This is required because TheSkyX cannot handle simulataneous commands. This function will block until the lock is released. """ with self._command_lock: return super().query(*args, **kwargs)
def _update_status(self): """Fetch current status from TheSkyX and cache key flags. Returns: dict: The merged status dictionary including coordinates. """ status = self.query("get_status") try: self._at_mount_park = status["parked"] self._is_parked = status["parked"] self._is_tracking = status["tracking"] self._is_slewing = status["slewing"] except KeyError: self.logger.warning("Problem with status, key not found") status.update(self.query("get_coordinates")) self.logger.debug(f"Mount status: {status}") return status
[docs] def set_target_coordinates(self, coords): """Sets the RA and Dec for the mount's current target. Args: coords (astropy.coordinates.SkyCoord): coordinates specifying target location Returns: bool: Boolean indicating success """ # Reset current target self._target_coordinates = None target_set = False if self.is_parked: self.logger.info("Mount is parked") else: # Save the skycoord coordinates self.logger.debug(f"Setting target coordinates: {coords}") # Get coordinate format from mount specific class mount_coords = self._skycoord_to_mount_coord(coords) # Send coordinates to mount try: response = self.query( "set_target_coordinates", { "ra": mount_coords[0], "dec": mount_coords[1], }, ) target_set = response["success"] if target_set: self._target_coordinates = coords self.logger.debug(response["msg"]) else: raise Exception(f"Problem setting mount coordinates: {mount_coords}") except Exception as e: self.logger.warning(e) return target_set
[docs] def set_park_position(self): """Send command to set current position as park in TheSkyX.""" self.query("set_park_position") self.logger.info(f"Mount park position set: {self._park_coordinates}")
########################################################################## # Movement methods ##########################################################################
[docs] def slew_to_target(self, timeout=120, **kwargs): """Slews to the current _target_coordinates Returns: bool: indicating success """ success = False if self.is_parked: self.logger.info("Mount is parked") elif not self.has_target: self.logger.info("Target Coordinates not set") else: # Get coordinate format from mount specific class mount_coords = self._skycoord_to_mount_coord(self._target_coordinates) # Send coordinates to mount self.logger.info(f"Slewing to target coordinates: {mount_coords}") try: response = self.query( "slew_to_coordinates", {"ra": mount_coords[0], "dec": mount_coords[1]}, timeout=timeout, ) success = response["success"] if success: while self.is_slewing: time.sleep(2) else: raise error.PanError(f"Slewing was unsuccessful: {response['response']}") except Exception as e: self.logger.warning(f"Problem slewing to mount coordinates: {mount_coords} {e}") if success: if not self.query("start_tracking")["success"]: self.logger.warning("Tracking not turned on for target") self._is_tracking = True else: self.logger.debug("Now tracking at target") return success
[docs] def slew_to_home(self, blocking=False, timeout=120): """Slews the mount to the home position. Note: Home position and Park position are not the same thing Args: blocking (bool, optional): If command should block while slewing to home, default False. timeout (int, optional): Timeout in seconds, default 120. Returns: bool: indicating success """ response = 0 if not self.is_parked: self._target_coordinates = None response = self.query("goto_home", timeout=timeout) else: self.logger.info("Mount is parked") return response
[docs] def slew_to_zero(self, blocking=False): """Calls `slew_to_home` in base class. Can be overridden.""" self.slew_to_home(blocking=blocking)
[docs] def park(self, timeout=120): """Slews to the park position and parks the mount. Note: When mount is parked no movement commands will be accepted. Returns: bool: indicating success """ self.logger.debug("Parking mount") response = self.query("park", timeout=timeout) self._is_parked = response["success"] return self.is_parked
[docs] def unpark(self): """Unparks the mount. Does not do any movement commands but makes them available again. Returns: bool: indicating success """ response = self.query("unpark") if response["success"]: self._is_parked = False self.logger.debug("Mount unparked") else: self.logger.warning("Problem with unpark of mount") return response["success"]
[docs] def move_direction(self, direction="north", seconds=1.0, arcmin=None, rate=None): """Move mount in specified `direction` for given amount of `seconds`""" seconds = float(seconds) assert direction in ["north", "south", "east", "west", "left", "right", "up", "down"] move_command = f"move_{direction}" self.logger.debug(f"Move command: {move_command}") if rate is None: rate = 15.04 # (u.arcsec / u.second) if arcmin is None: arcmin = (rate * seconds) / 60.0 try: self.logger.debug(f"Moving {direction} for {arcmin} arcmins. ") self.query( move_command, params={"direction": direction.upper()[0], "arcmin": arcmin}, timeout=seconds + 10, ) except KeyboardInterrupt: self.logger.warning("Keyboard interrupt, stopping movement.") except Exception as e: self.logger.warning(f"Problem moving command!! Make sure mount has stopped moving: {e}") finally: # Note: We do this twice. That's fine. self.logger.debug("Stopping movement") self.query("stop_moving")
########################################################################## # Communication Methods ##########################################################################
[docs] def write(self, value): """Send a JavaScript command string to TheSkyX. Args: value (str): JavaScript to execute on TheSkyX. Returns: None """ return self.theskyx.write(value)
[docs] def read(self, timeout=5): """Read and parse a JSON-like response from TheSkyX. Args: timeout (int): Seconds to wait for a response before giving up. Returns: dict: A best-effort parsed object with 'response' and 'success' keys. """ response_obj = {"success": False} response = self.theskyx.read(timeout=timeout) if response is None: return response_obj try: response_obj = json.loads(response) except TypeError as e: self.logger.warning(f"Error: {e}") except json.JSONDecodeError as e: response_obj = {"response": response, "success": False, "error": e} return response_obj
########################################################################## # Private Methods ########################################################################## def _get_command(self, cmd, params=None): """Looks up appropriate command for telescope""" cmd_info = self.commands.get(cmd) try: filename = cmd_info["file"] except KeyError: raise error.InvalidMountCommand("Command not found") if filename.startswith("/") is False: filename = os.path.join(self.template_dir, filename) template = "" try: with open(filename) as f: template = Template(f.read()) except Exception as e: self.logger.warning(f"Problem reading TheSkyX template {filename}: {e}") if params is None: params = {} params.setdefault("async", "true") return template.safe_substitute(params) def _setup_location_for_mount(self): self.logger.warning("TheSkyX requires location to be set in the application") def _mount_coord_to_skycoord(self, mount_coords): """ Converts between iOptron RA/Dec format and a SkyCoord Args: mount_coords (str): Coordinates as returned by mount Returns: astropy.SkyCoord: Mount coordinates as astropy SkyCoord with EarthLocation included. """ if isinstance(mount_coords, dict): ra = mount_coords["ra"] dec = mount_coords["dec"] else: ra, dec = mount_coords.split(" ") ra = (float(ra) * u.hourangle).to(u.degree) dec = float(dec) * u.deg coords = SkyCoord(ra, dec) return coords def _skycoord_to_mount_coord(self, coords): """ Converts between SkyCoord and a iOptron RA/Dec format. ` TTTTTTTT(T) 0.01 arc-seconds XXXXX(XXX) milliseconds Command: “:SrXXXXXXXX#” Defines the commanded right ascension, RA. Slew, calibrate and park commands operate on the most recently defined right ascension. Command: “:SdsTTTTTTTT#” Defines the commanded declination, Dec. Slew, calibrate and park commands operate on the most recently defined declination. ` @param coords astropy.coordinates.SkyCoord @retval A tuple of RA/Dec coordinates """ ra = coords.ra.to(u.hourangle).to_string() dec = coords.dec.to_string() self.logger.debug(f"RA: {ra} \t Dec: {dec}") mount_coords = (ra, dec) return mount_coords