Source code for panoptes.pocs.focuser.serial

"""Abstract serial focuser base.

Provides AbstractSerialFocuser, a small helper that wires up a SerialData
connection and common connect/reconnect/cleanup behavior for serial-backed
focusers.
"""

from abc import abstractmethod
from contextlib import suppress

from panoptes.utils.rs232 import SerialData

from panoptes.pocs.focuser import AbstractFocuser


[docs] class AbstractSerialFocuser(AbstractFocuser): """Base class for serial-connected focusers. Handles serial port lifecycle and basic movement flags shared by concrete serial focusers; concrete subclasses must implement _initialize(). """ # Class variable to cache the device node scanning results _adaptor_nodes = None # Class variable to store the device nodes already in use. Prevents scanning # known focuser devices & acts as a check against adaptors assigned to incorrect ports. _assigned_nodes = [] def __init__(self, baudrate=None, initial_position=None, *args, **kwargs): """ Args: initial_position (int, optional): If provided, move to this position after initializing. baudrate (int, optional): The baudrate of the serial device. Default: None. **kwargs: Parsed to AbstractFocuser init function. """ super().__init__(*args, **kwargs) self.baudrate = baudrate self._is_moving = False # Check that this node hasn't already been assigned to another focuser device if self.port in AbstractSerialFocuser._assigned_nodes: self.logger.error(f"Device node {self.port} already in use!") return # Connect to the serial device try: self.connect(port=self.port, baudrate=self.baudrate) except Exception as err: self.logger.error(f"Error connecting to {self.name} on {self.port}: {err!r}") return # Initialize the focuser self._initialize() AbstractSerialFocuser._assigned_nodes.append(self.port) self.logger.info(f"Successfully initialized {self}.") # Move to the initial position # TODO: Move this to Focuser base class? if initial_position is not None: self.logger.info(f"Initial position for {self}: {initial_position}") self.move_to(initial_position) def __del__(self): with suppress(AttributeError): device_node = self.port AbstractSerialFocuser._assigned_nodes.remove(device_node) self.logger.debug(f"Removed {device_node} from assigned nodes list") with suppress(AttributeError): self._serial.close() self.logger.debug(f"Closed serial port {self._port}") # Properties @property def is_connected(self): """True if the focuser serial device is currently connected.""" if self._serial: return self._serial.is_connected return False @property def is_moving(self): """True if the focuser is currently moving.""" return self._is_moving # Methods
[docs] def connect(self, *args, **kwargs): """Connect to the serial device. Args: *args, **kwargs: Parsed to SerialData. """ self._serial = SerialData(*args, **kwargs)
[docs] def reconnect(self): """Close and open serial port and reconnect to focuser.""" self.logger.debug(f"Attempting to reconnect to {self}.") self.__del__() self.connect(port=self.port)
# Private Methods @abstractmethod def _initialize(self): """Device - specific initialization.""" raise NotImplementedError