"""Bisque/TheSkyX-based mount driver.
Implements a mount driver that communicates with Software Bisque's TheSkyX via
its TCP scripting interface, using small JavaScript templates for commands.
"""
import json
import os
import time
from string import Template
from threading import Lock
from astropy import units as u
from astropy.coordinates import SkyCoord
from panoptes.utils import error
from panoptes.pocs.mount import AbstractMount
from panoptes.pocs.utils import theskyx
[docs]
class Mount(AbstractMount):
"""Mount driver that proxies motions and queries to TheSkyX.
Uses small JavaScript template files to issue commands over TheSkyX's TCP
interface and parses JSON-like responses to update status and execute slews.
"""
def __init__(self, *args, **kwargs):
"""Initialize the Bisque mount driver and connect to TheSkyX client."""
super().__init__(*args, **kwargs)
self.theskyx = theskyx.TheSkyX()
self._command_lock = Lock()
template_dir = self.get_config("mount.template_dir")
if template_dir.startswith("/") is False:
template_dir = os.path.join(os.environ["POCS"], template_dir)
assert os.path.exists(template_dir), self.logger.warning(
"Bisque Mounts required a template directory"
)
self.template_dir = template_dir
##########################################################################
# Properties
##########################################################################
@property
def is_parked(self):
"""bool: Mount parked status."""
self._update_status()
return self._is_parked
@property
def is_home(self):
"""bool: Mount home status."""
self._update_status()
return self._is_home
@property
def is_tracking(self):
"""bool: Mount tracking status."""
self._update_status()
return self._is_tracking
@property
def is_slewing(self):
"""bool: Mount slewing status."""
self._update_status()
return self._is_slewing
@property
def at_mount_park(self):
"""bool: Mount slewing status."""
self._update_status()
return self._at_mount_park
##########################################################################
# Methods
##########################################################################
[docs]
def connect(self):
"""Connects to the mount via the serial port (`self._port`)
Returns:
bool: Returns the self.is_connected property which checks
the actual serial connection.
"""
self.logger.info("Connecting to mount")
self.write(self._get_command("connect"))
response = self.read()
self._is_connected = response["success"]
try:
self.logger.info(response["msg"])
except KeyError:
pass
return self.is_connected
[docs]
def disconnect(self):
"""Disconnect from TheSkyX and mark connection closed.
Returns:
bool: True if now disconnected.
"""
self.logger.debug("Disconnecting mount from TheSkyX")
self.query("disconnect")
self._is_connected = False
return not self.is_connected
[docs]
def initialize(self, unpark=False, *args, **kwargs):
"""Initialize the connection with the mount and setup for location.
If the mount is successfully initialized, the `_setup_location_for_mount` method
is also called.
Returns:
bool: Returns the value from `self.is_initialized`.
"""
if not self.is_connected:
self.connect()
if self.is_connected and not self.is_initialized:
self.logger.info(f"Initializing {__name__} mount")
# We trick the mount into thinking it's initialized while we
# initialize otherwise the `serial_query` method will test
# to see if initialized and be put into loop.
self._is_initialized = True
self._setup_location_for_mount()
if unpark:
self.unpark()
self.logger.info(f"Mount initialized: {self.is_initialized}")
return self.is_initialized
[docs]
def query(self, *args, **kwargs):
"""Override the query method to use the command lock.
This is required because TheSkyX cannot handle simulataneous commands. This function will
block until the lock is released.
"""
with self._command_lock:
return super().query(*args, **kwargs)
def _update_status(self):
"""Fetch current status from TheSkyX and cache key flags.
Returns:
dict: The merged status dictionary including coordinates.
"""
status = self.query("get_status")
try:
self._at_mount_park = status["parked"]
self._is_parked = status["parked"]
self._is_tracking = status["tracking"]
self._is_slewing = status["slewing"]
except KeyError:
self.logger.warning("Problem with status, key not found")
status.update(self.query("get_coordinates"))
self.logger.debug(f"Mount status: {status}")
return status
[docs]
def set_target_coordinates(self, coords):
"""Sets the RA and Dec for the mount's current target.
Args:
coords (astropy.coordinates.SkyCoord): coordinates specifying target location
Returns:
bool: Boolean indicating success
"""
# Reset current target
self._target_coordinates = None
target_set = False
if self.is_parked:
self.logger.info("Mount is parked")
else:
# Save the skycoord coordinates
self.logger.debug(f"Setting target coordinates: {coords}")
# Get coordinate format from mount specific class
mount_coords = self._skycoord_to_mount_coord(coords)
# Send coordinates to mount
try:
response = self.query(
"set_target_coordinates",
{
"ra": mount_coords[0],
"dec": mount_coords[1],
},
)
target_set = response["success"]
if target_set:
self._target_coordinates = coords
self.logger.debug(response["msg"])
else:
raise Exception(f"Problem setting mount coordinates: {mount_coords}")
except Exception as e:
self.logger.warning(e)
return target_set
[docs]
def set_park_position(self):
"""Send command to set current position as park in TheSkyX."""
self.query("set_park_position")
self.logger.info(f"Mount park position set: {self._park_coordinates}")
##########################################################################
# Movement methods
##########################################################################
[docs]
def slew_to_target(self, timeout=120, **kwargs):
"""Slews to the current _target_coordinates
Returns:
bool: indicating success
"""
success = False
if self.is_parked:
self.logger.info("Mount is parked")
elif not self.has_target:
self.logger.info("Target Coordinates not set")
else:
# Get coordinate format from mount specific class
mount_coords = self._skycoord_to_mount_coord(self._target_coordinates)
# Send coordinates to mount
self.logger.info(f"Slewing to target coordinates: {mount_coords}")
try:
response = self.query(
"slew_to_coordinates",
{"ra": mount_coords[0], "dec": mount_coords[1]},
timeout=timeout,
)
success = response["success"]
if success:
while self.is_slewing:
time.sleep(2)
else:
raise error.PanError(f"Slewing was unsuccessful: {response['response']}")
except Exception as e:
self.logger.warning(f"Problem slewing to mount coordinates: {mount_coords} {e}")
if success:
if not self.query("start_tracking")["success"]:
self.logger.warning("Tracking not turned on for target")
self._is_tracking = True
else:
self.logger.debug("Now tracking at target")
return success
[docs]
def slew_to_home(self, blocking=False, timeout=120):
"""Slews the mount to the home position.
Note:
Home position and Park position are not the same thing
Args:
blocking (bool, optional): If command should block while slewing to
home, default False.
timeout (int, optional): Timeout in seconds, default 120.
Returns:
bool: indicating success
"""
response = 0
if not self.is_parked:
self._target_coordinates = None
response = self.query("goto_home", timeout=timeout)
else:
self.logger.info("Mount is parked")
return response
[docs]
def slew_to_zero(self, blocking=False):
"""Calls `slew_to_home` in base class. Can be overridden."""
self.slew_to_home(blocking=blocking)
[docs]
def park(self, timeout=120):
"""Slews to the park position and parks the mount.
Note:
When mount is parked no movement commands will be accepted.
Returns:
bool: indicating success
"""
self.logger.debug("Parking mount")
response = self.query("park", timeout=timeout)
self._is_parked = response["success"]
return self.is_parked
[docs]
def unpark(self):
"""Unparks the mount. Does not do any movement commands but makes them available again.
Returns:
bool: indicating success
"""
response = self.query("unpark")
if response["success"]:
self._is_parked = False
self.logger.debug("Mount unparked")
else:
self.logger.warning("Problem with unpark of mount")
return response["success"]
[docs]
def move_direction(self, direction="north", seconds=1.0, arcmin=None, rate=None):
"""Move mount in specified `direction` for given amount of `seconds`"""
seconds = float(seconds)
assert direction in ["north", "south", "east", "west", "left", "right", "up", "down"]
move_command = f"move_{direction}"
self.logger.debug(f"Move command: {move_command}")
if rate is None:
rate = 15.04 # (u.arcsec / u.second)
if arcmin is None:
arcmin = (rate * seconds) / 60.0
try:
self.logger.debug(f"Moving {direction} for {arcmin} arcmins. ")
self.query(
move_command,
params={"direction": direction.upper()[0], "arcmin": arcmin},
timeout=seconds + 10,
)
except KeyboardInterrupt:
self.logger.warning("Keyboard interrupt, stopping movement.")
except Exception as e:
self.logger.warning(f"Problem moving command!! Make sure mount has stopped moving: {e}")
finally:
# Note: We do this twice. That's fine.
self.logger.debug("Stopping movement")
self.query("stop_moving")
##########################################################################
# Communication Methods
##########################################################################
[docs]
def write(self, value):
"""Send a JavaScript command string to TheSkyX.
Args:
value (str): JavaScript to execute on TheSkyX.
Returns:
None
"""
return self.theskyx.write(value)
[docs]
def read(self, timeout=5):
"""Read and parse a JSON-like response from TheSkyX.
Args:
timeout (int): Seconds to wait for a response before giving up.
Returns:
dict: A best-effort parsed object with 'response' and 'success' keys.
"""
response_obj = {"success": False}
response = self.theskyx.read(timeout=timeout)
if response is None:
return response_obj
try:
response_obj = json.loads(response)
except TypeError as e:
self.logger.warning(f"Error: {e}")
except json.JSONDecodeError as e:
response_obj = {"response": response, "success": False, "error": e}
return response_obj
##########################################################################
# Private Methods
##########################################################################
def _get_command(self, cmd, params=None):
"""Looks up appropriate command for telescope"""
cmd_info = self.commands.get(cmd)
try:
filename = cmd_info["file"]
except KeyError:
raise error.InvalidMountCommand("Command not found")
if filename.startswith("/") is False:
filename = os.path.join(self.template_dir, filename)
template = ""
try:
with open(filename) as f:
template = Template(f.read())
except Exception as e:
self.logger.warning(f"Problem reading TheSkyX template {filename}: {e}")
if params is None:
params = {}
params.setdefault("async", "true")
return template.safe_substitute(params)
def _setup_location_for_mount(self):
self.logger.warning("TheSkyX requires location to be set in the application")
def _mount_coord_to_skycoord(self, mount_coords):
"""
Converts between iOptron RA/Dec format and a SkyCoord
Args:
mount_coords (str): Coordinates as returned by mount
Returns:
astropy.SkyCoord: Mount coordinates as astropy SkyCoord with
EarthLocation included.
"""
if isinstance(mount_coords, dict):
ra = mount_coords["ra"]
dec = mount_coords["dec"]
else:
ra, dec = mount_coords.split(" ")
ra = (float(ra) * u.hourangle).to(u.degree)
dec = float(dec) * u.deg
coords = SkyCoord(ra, dec)
return coords
def _skycoord_to_mount_coord(self, coords):
"""
Converts between SkyCoord and a iOptron RA/Dec format.
`
TTTTTTTT(T) 0.01 arc-seconds
XXXXX(XXX) milliseconds
Command: “:SrXXXXXXXX#”
Defines the commanded right ascension, RA. Slew, calibrate
and park commands operate on the most recently defined
right ascension.
Command: “:SdsTTTTTTTT#”
Defines the commanded declination, Dec. Slew, calibrate and
park commands operate on the most recently defined declination.
`
@param coords astropy.coordinates.SkyCoord
@retval A tuple of RA/Dec coordinates
"""
ra = coords.ra.to(u.hourangle).to_string()
dec = coords.dec.to_string()
self.logger.debug(f"RA: {ra} \t Dec: {dec}")
mount_coords = (ra, dec)
return mount_coords