"""Optec FocusLynx focuser driver.
Implements control for Optec FocusLynx-compatible focusers (including Focus Boss II)
via a serial connection, following the AbstractFocuser interface used by POCS.
"""
import time
from contextlib import suppress
from warnings import warn
import astropy.units as u
import serial
from panoptes.pocs.focuser import AbstractFocuser
[docs]
class Focuser(AbstractFocuser):
"""
Focuser class for control of telescope focusers using the Optec FocusLynx focus controller.
This includes the Starlight Instruments Focus Boss II controller, which is "powered by Optec"
Args:
port (str): device node of the serial port the focuser controller is connected to, e.g.
'/dev/ttyUSB0'
name (str, optional): default 'FocusLynx Focuser'
initial_position (int, optional): if given the focuser will drive to this encoder position
following initialisation.
focuser_number (int, optional): for focus controllers that support more than one focuser
set this number to specify which focuser should be controlled by this object. Default 1
min_position (int, optional): minimum allowed focuser position in encoder units, default 0
max_position (int, optional): maximum allowed focuser position in encoder units. If not
given the value will be taken from the focuser's internal config.
Additional positional and keyword arguments are passed to the base class, AbstractFocuser. See
that class for a complete list.
"""
def __init__(
self,
port,
name="FocusLynx Focuser",
initial_position=None,
focuser_number=1,
min_position=0,
max_position=None,
*args,
**kwargs,
):
super().__init__(port=port, name=name, *args, **kwargs)
self.logger.debug("Initialising FocusLynx focuser")
try:
self.connect()
except (serial.SerialException, serial.SerialTimeoutException) as err:
message = f"Error connecting to {self.name} on {port}: {err}"
self.logger.error(message)
warn(message)
return
self._focuser_number = focuser_number
self._initialise()
if min_position >= 0:
self._min_position = int(min_position)
else:
self._min_position = 0
message = f"Specified min_position {min_position} less than zero, ignoring!"
warn(message)
if max_position is not None:
if max_position <= self._max_position:
if max_position > self._min_position:
self._max_position = int(max_position)
else:
raise ValueError("Max position must be greater than min position!")
else:
self.logger.warning(
"Specified max_position {} greater than focuser max {}!",
max_position,
self._max_position,
)
if initial_position is not None:
self.position = initial_position
def __del__(self):
with suppress(AttributeError):
self._serial_port.close()
self.logger.debug(f"Closed serial port {self._port}")
def __str__(self):
return f"{self.name} {self._focuser_number} ({self.uid}) on {self.port}"
##################################################################################################
# Properties
##################################################################################################
@property
def uid(self):
"""
The user set 'nickname' of the focuser. Must be <= 16 characters
"""
try:
uid = self._focuser_config["Nickname"]
except AttributeError:
uid = self.port
return uid
@uid.setter
def uid(self, nickname):
"""Set the user-defined nickname for this focuser.
Args:
nickname (str): Nickname string (<= 16 characters).
"""
if len(nickname) > 16:
self.logger.warning(
"Truncated nickname {} to {} (must be <= 16 characters)", nickname, nickname[:16]
)
nickname = nickname[:16]
command_str = f"<F{self._focuser_number:1d}SCNN{nickname}>"
self._send_command(command_str, expected_reply="SET")
self._get_focuser_config()
@property
def is_connected(self):
"""
Checks status of serial port to determine if connected.
"""
connected = False
if self._serial_port:
connected = self._serial_port.isOpen()
return connected
@AbstractFocuser.position.getter
def position(self):
"""
Current focus position in encoder units
"""
self._update_focuser_status()
return self._position
@property
def min_position(self):
"""
Position of close limit of focus travel, in encoder units
"""
return self._min_position
@property
def max_position(self):
"""
Position of far limit of focus travel, in encoder units
"""
return self._max_position
@property
def firmware_version(self):
"""
Firmware version of the focuser controller
"""
return self._hub_info["Hub FVer"]
@property
def hardware_version(self):
"""
Device type code of the focuser
"""
return self.model
@property
def temperature(self):
"""
Current temperature of the focuser, in degrees Celsus, as an astropy.units.Quantity
"""
self._update_focuser_status()
return self._temperature * u.Celsius
@property
def is_moving(self):
"""
True if the focuser is currently moving
"""
self._update_focuser_status()
return self._is_moving
#################################################################################################
# Methods
##################################################################################################
[docs]
def connect(self):
"""Open the serial connection to the FocusLynx controller.
Establishes a 115200 8N1 serial connection to the configured port and
assigns the handle to self._serial_port. Raises SerialException on failure.
"""
try:
# Configure serial port.
self._serial_port = serial.Serial(
port=self.port,
baudrate=115200,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1.0,
)
except serial.SerialException as err:
self._serial_port = None
self.logger.critical(f"Could not open {self.port}!")
raise err
self.logger.debug(f"Established serial connection to {self.name} on {self.port}")
[docs]
def move_to(self, position, blocking=True):
"""
Moves focuser to a new position.
Args:
position (int): new focuser position, in encoder units. Must be between min_position
and max_position.
blocking (bool, optional): If True (default) will block until the move is complete,
otherwise will return immediately.
Returns:
int: focuser position following the move. If blocking is True this will be the actual
focuser position, if False it will be the target position.
"""
position = int(position)
if position < self._min_position:
self.logger.error(
"Requested position {} less than min position, moving to {}!",
position,
self._min_position,
)
position = self._min_position
elif position > self._max_position:
self.logger.error(
"Requested position {} greater than max position, moving to {}!",
position,
self._max_position,
)
position = self._max_position
self.logger.debug(f"Moving focuser {self.uid} to {position}")
command_str = f"<F{self._focuser_number:1d}MA{position:06d}>"
self._send_command(command_str, expected_reply="M")
# Focuser move commands are non-blocking. Only option is polling is_moving
if blocking:
while self.is_moving:
time.sleep(1)
if self.position != self._target_position:
self.logger.warning(
"Focuser {} did not reach target position {}, now at {}!",
self.uid,
self._target_position,
self._position,
)
return self._position
else:
return position
[docs]
def move_by(self, increment, blocking=True):
"""
Moves focuser by a given amount.
Args:
increment (int): distance to move the focuser, in encoder units. New position must be
between min_position and max_position.
blocking (bool, optional): If True (default) will block until the move is complete,
otherwise will return immediately.
Returns:
int: focuser position following the move. If blocking is True this will be the actual
focuser position, if False it will be the target position.
"""
return self.move_to(self.position + increment)
[docs]
def halt(self):
"""
Causes the focuser to immediately stop any movements
"""
self._send_command(command_str=f"<F{self._focuser_number:1d}HALT>", expected_reply="HALTED")
message = f"Focuser {self.uid} halted"
self.logger.warning(message)
warn(message)
self._update_focuser_status()
##################################################################################################
# Private Methods
##################################################################################################
def _initialise(self):
self._get_hub_info()
self._get_focuser_config()
self._update_focuser_status()
self.model = self._focuser_config["Dev Typ"]
self._max_position = int(self._focuser_config["Max Pos"])
self.logger.info(f"{self} initialised")
def _get_hub_info(self):
self._hub_info = self._send_command(command_str="<FHGETHUBINFO>", expected_reply="HUB INFO")
def _get_focuser_config(self):
command_str = f"<F{self._focuser_number:1d}GETCONFIG>"
expected_reply = f"CONFIG{self._focuser_number:1d}"
self._focuser_config = self._send_command(command_str, expected_reply)
def _update_focuser_status(self):
command_str = f"<F{self._focuser_number:1d}GETSTATUS>"
expected_reply = f"STATUS{self._focuser_number:1d}"
self._focuser_status = self._send_command(command_str, expected_reply)
self._position = int(self._focuser_status["Curr Pos"])
self._target_position = int(self._focuser_status["Targ Pos"])
self._is_moving = bool(int(self._focuser_status["IsMoving"]))
self._temperature = float(self._focuser_status["Temp(C)"])
def _send_command(self, command_str, expected_reply):
"""
Utility function that handles the common aspects of sending commands and
parsing responses.
"""
# Make sure we start with a clean slate
self._serial_port.reset_output_buffer()
self._serial_port.reset_input_buffer()
# Send command
self._serial_port.write(command_str.encode("ascii"))
response = str(self._serial_port.readline(), encoding="ascii").strip()
if not response:
message = f"No response to command '{command_str}' from focuser {self.uid}"
self.logger.error(message)
raise RuntimeError(message)
# Should always get '!' back unless there's an error
if response != "!":
message = f"Error sending command '{command_str}' to focuser {self.uid}: {response}"
self.logger.error(message)
raise RuntimeError(message)
# Next line identifies the command the focuser is replying to.
command_echo = str(self._serial_port.readline(), encoding="ascii").strip()
if command_echo != expected_reply:
message = f"Expected reply '{expected_reply}' from {self.uid}, got '{command_echo}'"
self.logger.error(message)
raise RuntimeError(message)
# For get info type commands then get several lines of key = value, then 'END'
if expected_reply in ("HUB INFO", "CONFIG1", "CONFIG2", "STATUS1", "STATUS2"):
info = {}
response = str(self._serial_port.readline(), encoding="ascii").strip()
while response != "END":
key, value = (item.strip() for item in response.split("="))
info[key] = value
response = str(self._serial_port.readline(), encoding="ascii").strip()
return info
def _add_fits_keywords(self, header):
header = super()._add_fits_keywords(header)
header.set("FOC-MOD", self.model, "Focuser device type")
header.set("FOC-ID", self.uid, "Focuser nickname")
header.set("FOC-HW", self.hardware_version, "Focuser device type")
header.set("FOC-FW", self.firmware_version, "Focuser controller firmware version")
header.set("FOC-TEMP", self.temperature.value, "Focuser temperature (deg C)")
return header