Source code for panoptes.pocs.dome.bisque

"""Bisque dome controller using TheSkyX scripting interface.

Provides a Dome implementation that communicates with TheSkyX over TCP using
script templates to perform operations such as connect, open/close slit, park,
unpark, and query status/position.
"""

import os
import time
from string import Template

from panoptes.utils import error
from panoptes.utils.serializers import from_json

from panoptes.pocs import dome
from panoptes.pocs.utils import theskyx


[docs] class Dome(dome.AbstractDome): """Dome controller backed by TheSkyX. Uses TheSkyX's TCP scripting interface with small JavaScript templates to perform actions and read status. """ def __init__(self, *args, **kwargs): """Initialize the Bisque/TheSkyX dome controller. Args: *args: Forwarded to AbstractDome. **kwargs: May include 'template_dir' to override the scripts directory. """ super().__init__(*args, **kwargs) self.theskyx = theskyx.TheSkyX() template_dir = kwargs.get("template_dir", self.get_config("dome.template_dir")) if template_dir.startswith("/") is False: template_dir = os.path.join(os.environ["POCS"], template_dir) assert os.path.exists(template_dir), self.logger.warning( "Bisque Mounts required a template directory" ) self.template_dir = template_dir self._is_parked = True self._is_connected = False @property def is_connected(self): """Whether the TheSkyX dome driver reports connected. Returns: bool: True if connected to TheSkyX. """ return self._is_connected @property def is_open(self): """Whether the dome slit is open. Returns: bool: True if slit state reads as 'Open'. """ return self.read_slit_state() == "Open" @property def is_closed(self): """Whether the dome slit is closed. Returns: bool: True if slit state reads as 'Closed'. """ return self.read_slit_state() == "Closed"
[docs] def read_slit_state(self): """Query TheSkyX for the current slit state and return a label. Returns: str: One of 'Open', 'Closed', 'Unknown', or 'Disconnected'. """ if self.is_connected: self.write(self._get_command("dome/slit_state.js")) response = self.read() slit_lookup = { 0: "Unknown", 1: "Open", 2: "Closed", 3: "Open", 4: "Closed", } return slit_lookup.get(response["msg"], "Unknown") else: return "Disconnected"
@property def status(self): """Return a status mapping from TheSkyX. Returns: dict: Parsed response from the 'dome/status.js' template. """ self.write(self._get_command("dome/status.js")) return self.read() @property def position(self): """Return current dome position as reported by TheSkyX. Returns: dict: Parsed response from the 'dome/position.js' template. """ self.write(self._get_command("dome/position.js")) return self.read() @property def is_parked(self): """Whether the dome is parked (per TheSkyX responses).""" return self._is_parked
[docs] def connect(self): """Connect to the dome controller via TheSkyX. Returns: bool: True if the connection succeeds. """ if not self.is_connected: self.write(self._get_command("dome/connect.js")) response = self.read() self._is_connected = response["success"] return self.is_connected
[docs] def disconnect(self): """Disconnect from TheSkyX, closing the slit first if necessary. Returns: bool: True if now disconnected. """ if self.is_connected: if self.is_open: self.close() self.write(self._get_command("dome/disconnect.js")) response = self.read() if response["success"]: self._is_connected = False return not self.is_connected
[docs] def open(self): """Command the dome to open the slit and wait until it reports open. Returns: bool: True if the slit ends up open. """ if self.is_closed: self.logger.debug("Opening slit on dome") self.write(self._get_command("dome/open_slit.js")) while not self.is_open: self.logger.debug("Waiting for slit to open") time.sleep(1) return self.is_open
[docs] def close(self): """Command the dome to close the slit and wait until it reports closed. Returns: bool: True if the slit ends up closed. """ if self.is_open: self.logger.debug("Closing slit on dome") self.write(self._get_command("dome/close_slit.js")) while self.is_open: self.logger.debug("Waiting for slit to close") time.sleep(1) return self.is_closed
[docs] def park(self): """Park the dome via TheSkyX and update internal state. Returns: bool: True if park succeeded. """ if self.is_connected: self.write(self._get_command("dome/park.js")) response = self.read() self._is_parked = response["success"] return self.is_parked
[docs] def unpark(self): """Unpark the dome via TheSkyX and update internal state. Returns: bool: True if the dome reports unparked. """ if self.is_connected: self.write(self._get_command("dome/unpark.js")) response = self.read() self._is_parked = not response["success"] return not self.is_parked
[docs] def find_home(self): """Command TheSkyX to find the dome's home position. Returns: bool: True if TheSkyX reports success and the dome is now parked at home; otherwise returns the last known parked state. """ if self.is_connected: self.write(self._get_command("dome/home.js")) response = self.read() self._is_parked = response["success"] return self.is_parked
################################################################################################## # Communication Methods ##################################################################################################
[docs] def write(self, value): """Send a script to TheSkyX. Args: value (str): JavaScript command to execute. Returns: None """ return self.theskyx.write(value)
[docs] def read(self, timeout=5): """Read and parse a response from TheSkyX with a simple timeout loop. Args: timeout (int): Seconds to wait before giving up. Returns: dict: Parsed JSON-like object with 'response' and 'success' keys when possible. """ while True: response = self.theskyx.read() if response is not None or timeout == 0: break else: time.sleep(1) timeout -= 1 # Default object. response_obj = { "response": response, "success": False, } try: response_obj = from_json(response) except (TypeError, error.InvalidDeserialization) as e: self.logger.warning(f"Error: {e!r}: {response}") return response_obj
################################################################################################## # Private Methods ################################################################################################## def _get_command(self, filename, params=None): """Looks up appropriate command for telescope""" if filename.startswith("/") is False: filename = os.path.join(self.template_dir, filename) template = "" try: with open(filename) as f: template = Template(f.read()) except Exception as e: self.logger.warning(f"Problem reading TheSkyX template {filename}: {e}") if params is None: params = {} params.setdefault("async", "true") return template.safe_substitute(params)