Source code for panoptes.pocs.utils.theskyx
"""Lightweight socket client for TheSkyX scripting interface.
Provides a minimal wrapper (TheSkyX) to connect, write commands, and read
responses from a running TheSkyX instance over TCP.
"""
import socket
from panoptes.utils import error
from panoptes.pocs.utils.logger import get_logger
[docs]
class TheSkyX:
"""A socket connection for communicating with TheSkyX."""
def __init__(self, host="localhost", port=3040, connect=True, *args, **kwargs):
self.logger = get_logger()
self._host = host
self._port = port
self._socket = None
self._is_connected = False
if connect:
self.connect()
@property
def is_connected(self):
"""Whether the socket is currently connected.
Returns:
bool: True if a TCP connection is established.
"""
return self._is_connected
[docs]
def connect(self):
"""Establish a TCP connection to TheSkyX."""
self.logger.debug(f"Making TheSkyX connection at {self._host}:{self._port}")
if not self.is_connected:
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self._host, self._port))
except ConnectionRefusedError:
self.logger.warning("Cannot create connection to TheSkyX")
else:
self._is_connected = True
self.logger.info(f"Connected to TheSkyX via {self._host}:{self._port}")
[docs]
def write(self, value):
"""Send a command string to TheSkyX.
Args:
value (str): The command to send.
Raises:
panoptes.utils.error.BadConnection: If not connected.
"""
try:
assert isinstance(value, str)
self.socket.sendall(value.encode())
except AttributeError:
raise error.BadConnection("Not connected to TheSkyX")
[docs]
def read(self, timeout=5):
"""Read a response from TheSkyX.
Args:
timeout (int | float): Seconds to wait before timing out. Defaults to 5.
Returns:
str | None: The response string, or None if empty.
Raises:
panoptes.utils.error.BadConnection: If not connected.
panoptes.utils.error.TheSkyXTimeout: If a socket timeout occurs.
panoptes.utils.error.TheSkyXKeyError: If a key-related error is reported.
panoptes.utils.error.TheSkyXError: For other error responses from TheSkyX.
"""
try:
self.socket.settimeout(timeout)
response = None
err = None
try:
response = self.socket.recv(2048).decode()
if "|" in response:
response, err = response.split("|")
if "Error:" in response:
response, err = response.split(":")
if err is not None and "No error" not in err:
if "Error = 303" in err:
raise error.TheSkyXKeyError("Invalid TheSkyX key")
raise error.TheSkyXError(err)
except TimeoutError: # pragma: no cover
raise error.TheSkyXTimeout()
return response
except AttributeError:
raise error.BadConnection("Not connected to TheSkyX")