Source code for panoptes.pocs.utils.theskyx

"""Lightweight socket client for TheSkyX scripting interface.

Provides a minimal wrapper (TheSkyX) to connect, write commands, and read
responses from a running TheSkyX instance over TCP.
"""

import socket

from panoptes.utils import error

from panoptes.pocs.utils.logger import get_logger


[docs] class TheSkyX: """A socket connection for communicating with TheSkyX.""" def __init__(self, host="localhost", port=3040, connect=True, *args, **kwargs): self.logger = get_logger() self._host = host self._port = port self._socket = None self._is_connected = False if connect: self.connect() @property def is_connected(self): """Whether the socket is currently connected. Returns: bool: True if a TCP connection is established. """ return self._is_connected
[docs] def connect(self): """Establish a TCP connection to TheSkyX.""" self.logger.debug(f"Making TheSkyX connection at {self._host}:{self._port}") if not self.is_connected: try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self._host, self._port)) except ConnectionRefusedError: self.logger.warning("Cannot create connection to TheSkyX") else: self._is_connected = True self.logger.info(f"Connected to TheSkyX via {self._host}:{self._port}")
[docs] def write(self, value): """Send a command string to TheSkyX. Args: value (str): The command to send. Raises: panoptes.utils.error.BadConnection: If not connected. """ try: assert isinstance(value, str) self.socket.sendall(value.encode()) except AttributeError: raise error.BadConnection("Not connected to TheSkyX")
[docs] def read(self, timeout=5): """Read a response from TheSkyX. Args: timeout (int | float): Seconds to wait before timing out. Defaults to 5. Returns: str | None: The response string, or None if empty. Raises: panoptes.utils.error.BadConnection: If not connected. panoptes.utils.error.TheSkyXTimeout: If a socket timeout occurs. panoptes.utils.error.TheSkyXKeyError: If a key-related error is reported. panoptes.utils.error.TheSkyXError: For other error responses from TheSkyX. """ try: self.socket.settimeout(timeout) response = None err = None try: response = self.socket.recv(2048).decode() if "|" in response: response, err = response.split("|") if "Error:" in response: response, err = response.split(":") if err is not None and "No error" not in err: if "Error = 303" in err: raise error.TheSkyXKeyError("Invalid TheSkyX key") raise error.TheSkyXError(err) except TimeoutError: # pragma: no cover raise error.TheSkyXTimeout() return response except AttributeError: raise error.BadConnection("Not connected to TheSkyX")