Source code for panoptes.pocs.focuser.focuser

"""Abstract focuser base and autofocus helpers.

Defines AbstractFocuser, a hardware-agnostic base with common movement and
autofocus utilities shared by concrete focuser drivers (serial, FocusLynx,
astromechanics, simulator). Includes methods to sweep focus positions,
measure sharpness, and optionally generate diagnostic plots.
"""

import os
from abc import ABCMeta, abstractmethod
from threading import Event, Thread

import numpy as np
from astropy.modeling import fitting, models
from scipy.ndimage import binary_dilation

from panoptes.utils.images import focus as focus_utils
from panoptes.utils.images.misc import mask_saturated
from panoptes.utils.time import current_time

from panoptes.pocs.base import PanBase
from panoptes.pocs.utils.plotting import make_autofocus_plot


[docs] class AbstractFocuser(PanBase, metaclass=ABCMeta): """Base class for all focusers. Args: name (str, optional): name of the focuser model (str, optional): model of the focuser port (str, optional): port the focuser is connected to, e.g. a device node camera (pocs.camera.Camera, optional): camera that this focuser is associated with. timeout (int, optional): time to wait for response from focuser. initial_position (int, optional): if given the focuser will move to this position following initialisation. autofocus_range ((int, int) optional): Coarse & fine focus sweep range, in encoder units autofocus_step ((int, int), optional): Coarse & fine focus sweep steps, in encoder units autofocus_seconds (scalar, optional): Exposure time for focus exposures autofocus_size (int, optional): Size of square central region of image to use, default 500 x 500 pixels. autofocus_keep_files (bool, optional): If True will keep all images taken during focusing. If False (default) will delete all except the first and last images from each focus run. autofocus_take_dark (bool, optional): If True will attempt to take a dark frame before the focus run, and use it for dark subtraction and hot pixel masking, default True. autofocus_merit_function (str/callable, optional): Merit function to use as a focus metric, default vollath_F4 autofocus_merit_function_kwargs (dict, optional): Dictionary of additional keyword arguments for the merit function. autofocus_mask_dilations (int, optional): Number of iterations of dilation to perform on the saturated pixel mask (determine size of masked regions), default 10 autofocus_make_plots (bool, optional: Whether to write focus plots to images folder, default False. """ def __init__( self, name="Generic Focuser", model="simulator", port=None, camera=None, timeout=5, initial_position=None, autofocus_range=None, autofocus_step=None, autofocus_seconds=None, autofocus_size=None, autofocus_keep_files=None, autofocus_take_dark=None, autofocus_merit_function=None, autofocus_merit_function_kwargs=None, autofocus_mask_dilations=None, autofocus_make_plots=False, *args, **kwargs, ): super().__init__(*args, **kwargs) self.model = model self.port = port self.name = name self._connected = False self._serial_number = "XXXXXX" self.timeout = timeout if initial_position is None: self._position = None else: self._position = int(initial_position) self._set_autofocus_parameters( autofocus_range, autofocus_step, autofocus_seconds, autofocus_size, autofocus_keep_files, autofocus_take_dark, autofocus_merit_function, autofocus_merit_function_kwargs, autofocus_mask_dilations, autofocus_make_plots, ) self._autofocus_error = None self._camera = camera self.logger.debug(f"Focuser created: {self.name} on {self.port}") ################################################################################################## # Properties ################################################################################################## @property def uid(self): """A serial number for the focuser""" return self._serial_number @property def is_connected(self): """Is the focuser available""" return self._connected @property def position(self): """Current encoder position of the focuser""" return self._position @position.setter def position(self, position): """Move focusser to new encoder position""" self.move_to(position) @property def camera(self): """ Reference to the Camera object that the Focuser is assigned to, if any. A Focuser should only ever be assigned to one or zero Cameras! """ return self._camera @camera.setter def camera(self, camera): """Assign the focuser to a camera. Args: camera (AbstractCamera): The camera instance to associate with this focuser. """ if self._camera: if self._camera != camera: self.logger.warning( f"{self} already assigned to {self._camera}, skipping attempted assignment to {camera}!" ) else: self._camera = camera
[docs] @abstractmethod def min_position(self): """Get position of close limit of focus travel, in encoder units""" raise NotImplementedError
[docs] @abstractmethod def max_position(self): """Get position of far limit of focus travel, in encoder units""" raise NotImplementedError
[docs] @abstractmethod def is_moving(self): """True if the focuser is currently moving.""" raise NotImplementedError
@property def is_ready(self): """Whether the focuser is ready for a new move. Returns: bool: True if the focuser is not currently moving. """ # A focuser is 'ready' if it is not currently moving. return not self.is_moving @property def autofocus_error(self): """Error message from the most recent autofocus or None, if there was no error.""" return self._autofocus_error ################################################################################################## # Methods ##################################################################################################
[docs] @abstractmethod def move_to(self, position): """Move focuser to new encoder position. Args: position (int): new focuser position, in encoder units. Returns: int: focuser position following the move, in encoder units. """ raise NotImplementedError
[docs] def move_by(self, increment): """Move focuser by a given amount. Args: increment (int): distance to move the focuser, in encoder units. Returns: int: focuser position following the move, in encoder units. """ return self.move_to(self.position + increment)
[docs] def autofocus( self, seconds=None, focus_range=None, focus_step=None, cutout_size=None, keep_files=None, take_dark=None, merit_function=None, merit_function_kwargs=None, mask_dilations=None, coarse=False, make_plots=None, filter_name=None, blocking=False, ): """ Focuses the camera using the specified merit function. Optionally performs a coarse focus to find the approximate position of infinity focus, which should be followed by a fine focus before observing. Args: seconds (scalar, optional): Exposure time for focus exposures, if not specified will use value from config. focus_range (2-tuple, optional): Coarse & fine focus sweep range, in encoder units. Specify to override values from config. focus_step (2-tuple, optional): Coarse & fine focus sweep steps, in encoder units. Specify to override values from config. cutout_size (int, optional): Size of square central region of image to use, default 500 x 500 pixels. keep_files (bool, optional): If True will keep all images taken during focusing. If False (default) will delete all except the first and last images from each focus run. take_dark (bool, optional): If True will attempt to take a dark frame before the focus run, and use it for dark subtraction and hot pixel masking, default True. merit_function (str/callable, optional): Merit function to use as a focus metric, default vollath_F4. merit_function_kwargs (dict, optional): Dictionary of additional keyword arguments for the merit function. mask_dilations (int, optional): Number of iterations of dilation to perform on the saturated pixel mask (determine size of masked regions), default 10 coarse (bool, optional): Whether to perform a coarse focus, otherwise will perform a fine focus. Default False. make_plots (bool, optional): Whether to write focus plots to images folder. If not given will fall back on value of `autofocus_make_plots` set on initialisation, and if it wasn't set then will default to False. filter_name (str, optional): The filter to use for focusing. If not provided, will use last light position. blocking (bool, optional): Whether to block until autofocus complete, default False. Returns: threading.Event: Event that will be set when autofocusing is complete Raises: ValueError: If invalid values are passed for any of the focus parameters. """ self.logger.debug("Starting autofocus") assert self._camera.is_connected, self.logger.error("Camera must be connected for autofocus!") assert self.is_connected, self.logger.error("Focuser must be connected for autofocus!") if not focus_range: if self.autofocus_range: focus_range = self.autofocus_range else: raise ValueError(f"No focus_range specified, aborting autofocus of {self._camera}!") if not focus_step: if self.autofocus_step: focus_step = self.autofocus_step else: raise ValueError(f"No focus_step specified, aborting autofocus of {self._camera}!") if not seconds: if self.autofocus_seconds: seconds = self.autofocus_seconds else: raise ValueError("No focus exposure time specified, aborting autofocus of {}!", self._camera) if not cutout_size: if self.autofocus_size: cutout_size = self.autofocus_size else: raise ValueError("No focus thumbnail size specified, aborting autofocus of {}!", self._camera) if keep_files is None: if self.autofocus_keep_files: keep_files = True else: keep_files = False if take_dark is None: if self.autofocus_take_dark is not None: take_dark = self.autofocus_take_dark else: take_dark = True if not merit_function: if self.autofocus_merit_function: merit_function = self.autofocus_merit_function else: merit_function = "vollath_F4" if not merit_function_kwargs: if self.autofocus_merit_function_kwargs: merit_function_kwargs = self.autofocus_merit_function_kwargs else: merit_function_kwargs = {} if mask_dilations is None: if self.autofocus_mask_dilations is not None: mask_dilations = self.autofocus_mask_dilations else: mask_dilations = 10 if make_plots is None: make_plots = self.autofocus_make_plots # Move filterwheel to the correct position if self.camera is not None: if self.camera.has_filterwheel: if filter_name is None: # NOTE: The camera will move the FW to the last light position automatically self.logger.warning( f"Filter name not provided for autofocus on {self}. Using last light position." ) else: self.logger.info(f"Moving filterwheel to {filter_name} for autofocusing on {self}.") self.camera.filterwheel.move_to(filter_name, blocking=True) elif filter_name is None: self.logger.warning( f"Filter {filter_name} requiested for autofocus but {self.camera} has no filterwheel." ) # Set up the focus parameters focus_event = Event() focus_params = { "seconds": seconds, "focus_range": focus_range, "focus_step": focus_step, "cutout_size": cutout_size, "keep_files": keep_files, "take_dark": take_dark, "merit_function": merit_function, "merit_function_kwargs": merit_function_kwargs, "mask_dilations": mask_dilations, "coarse": coarse, "make_plots": make_plots, "focus_event": focus_event, } focus_thread = Thread(target=self._autofocus, kwargs=focus_params) focus_thread.start() if blocking: focus_event.wait() return focus_event
def _autofocus( self, seconds, focus_range, focus_step, cutout_size, keep_files, take_dark, merit_function, merit_function_kwargs, mask_dilations, make_plots, coarse, focus_event, *args, **kwargs, ): """Private helper method for calling autofocus in a Thread. See public `autofocus` for information about the parameters. """ focus_type = "fine" if coarse: focus_type = "coarse" initial_focus = self.position self.logger.debug( f"Beginning {focus_type} autofocus of {self._camera} - initial position: {initial_focus}" ) # Set up paths for temporary focus files, and plots if requested. image_dir = self.get_config("directories.images") start_time = current_time(flatten=True) file_path_root = os.path.join(image_dir, "focus", self._camera.uid, start_time) self._autofocus_error = None dark_cutout = None if take_dark: dark_path = os.path.join(file_path_root, f"dark.{self._camera.file_extension}") self.logger.debug(f"Taking dark frame {dark_path} on camera {self._camera}") try: dark_cutout = self._camera.get_cutout( seconds, dark_path, cutout_size, keep_file=True, dark=True ) # Mask 'saturated' with a low threshold to remove hot pixels dark_cutout = mask_saturated(dark_cutout, threshold=0.3, bit_depth=self.camera.bit_depth) except Exception as err: self.logger.error(f"Error taking dark frame: {err!r}") self._autofocus_error = repr(err) focus_event.set() raise err # Take an image before focusing, grab a cutout from the centre and add it to the plot initial_fn = f"{initial_focus}-{focus_type}-initial.{self._camera.file_extension}" initial_path = os.path.join(file_path_root, initial_fn) try: initial_cutout = self._camera.get_cutout(seconds, initial_path, cutout_size, keep_file=True) initial_cutout = mask_saturated(initial_cutout, bit_depth=self.camera.bit_depth) if dark_cutout is not None: initial_cutout = initial_cutout.astype(np.int32) - dark_cutout except Exception as err: self.logger.error(f"Error taking initial image: {err!r}") self._autofocus_error = repr(err) focus_event.set() raise err # Set up encoder positions for autofocus sweep, truncating at focus travel # limits if required. if coarse: focus_range = focus_range[1] focus_step = focus_step[1] else: focus_range = focus_range[0] focus_step = focus_step[0] # Get focus steps. focus_positions = np.arange( max(initial_focus - focus_range / 2, self.min_position), min(initial_focus + focus_range / 2, self.max_position) + 1, focus_step, dtype=int, ) n_positions = len(focus_positions) # Set up empty array holders cutouts = np.zeros((n_positions, cutout_size, cutout_size), dtype=initial_cutout.dtype) masks = np.empty((n_positions, cutout_size, cutout_size), dtype=bool) metrics = np.empty(n_positions) # Take and store an exposure for each focus position. for i, position in enumerate(focus_positions): # Move focus, updating focus_positions with actual encoder position after move. focus_positions[i] = self.move_to(position) focus_fn = f"{focus_positions[i]}-{i:02d}.{self._camera.file_extension}" file_path = os.path.join(file_path_root, focus_fn) # Take exposure. try: cutouts[i] = self._camera.get_cutout(seconds, file_path, cutout_size, keep_file=keep_files) except Exception as err: self.logger.error(f"Error taking image {i + 1}: {err!r}") self._autofocus_error = repr(err) focus_event.set() raise err masks[i] = mask_saturated(cutouts[i], bit_depth=self.camera.bit_depth).mask self.logger.debug(f"Making master mask with binary dilation for {self._camera}") master_mask = masks.any(axis=0) master_mask = binary_dilation(master_mask, iterations=mask_dilations) # Apply the master mask and then get metrics for each frame. for i, cutout in enumerate(cutouts): self.logger.debug(f"Applying focus metric to cutout {i:02d}") if dark_cutout is not None: cutout = cutout.astype(np.float32) - dark_cutout cutout = np.ma.array(cutout, mask=np.ma.mask_or(master_mask, np.ma.getmask(cutout))) metrics[i] = focus_utils.focus_metric(cutout, merit_function, **merit_function_kwargs) self.logger.debug(f"Focus metric for cutout {i:02d}: {metrics[i]}") # Only fit a fine focus. fitted = False fitting_indices = [None, None] # Find maximum metric values. imax = metrics.argmax() if imax == 0 or imax == (n_positions - 1): # TODO: have this automatically switch to coarse focus mode if this happens self.logger.warning( f"Best focus outside sweep range, stopping focus and using {focus_positions[imax]}" ) best_focus = focus_positions[imax] elif not coarse: # Fit data around the maximum value to determine best focus position. # Initialise models shift = models.Shift(offset=-focus_positions[imax]) # Small initial coeffs with expected sign. Helps fitting start in the right direction. poly = models.Polynomial1D( degree=4, c0=1, c1=0, c2=-1e-2, c3=0, c4=-1e-4, fixed={"c0": True, "c1": True, "c3": True}, ) scale = models.Scale(factor=metrics[imax]) # https://docs.astropy.org/en/stable/modeling/compound-models.html?#model-composition reparameterised_polynomial = shift | poly | scale # Initialise fitter fitter = fitting.LevMarLSQFitter() # Select data range for fitting. Tries to use 2 points either side of max, if in range. fitting_indices = (max(imax - 2, 0), min(imax + 2, n_positions - 1)) # Fit models to data fit = fitter( reparameterised_polynomial, focus_positions[fitting_indices[0] : fitting_indices[1] + 1], metrics[fitting_indices[0] : fitting_indices[1] + 1], ) # Get the encoder position of the best focus. best_focus = np.abs(fit.offset_0) fitted = True # Guard against fitting failures, force best focus to stay within sweep range. min_focus = focus_positions[0] max_focus = focus_positions[-1] if best_focus < min_focus: self.logger.warning(f"Fitting failure: best focus {best_focus} below sweep limit {min_focus}") best_focus = focus_positions[1] if best_focus > max_focus: self.logger.warning(f"Fitting failure: best focus {best_focus} above sweep limit {max_focus}") best_focus = focus_positions[-2] else: # Coarse focus, just use max value. best_focus = focus_positions[imax] # Move the focuser to best focus position. final_focus = self.move_to(best_focus) # Get final cutout. final_fn = f"{final_focus}-{focus_type}-final.{self._camera.file_extension}" file_path = os.path.join(file_path_root, final_fn) try: final_cutout = self._camera.get_cutout(seconds, file_path, cutout_size, keep_file=True) final_cutout = mask_saturated(final_cutout, bit_depth=self.camera.bit_depth) if dark_cutout is not None: final_cutout = final_cutout.astype(np.int32) - dark_cutout except Exception as err: self.logger.error(f"Error taking final image: {err!r}") self._autofocus_error = repr(err) focus_event.set() raise err if make_plots: line_fit = None if fitted: focus_range = np.arange( focus_positions[fitting_indices[0]], focus_positions[fitting_indices[1]] + 1 ) fit_line = fit(focus_range) line_fit = [focus_range, fit_line] plot_title = f"{self._camera} {focus_type} focus at {start_time}" # Make the plots plot_path = os.path.join(file_path_root, f"{focus_type}-focus.png") plot_path = make_autofocus_plot( plot_path, initial_cutout, final_cutout, initial_focus, final_focus, focus_positions, metrics, merit_function, plot_title=plot_title, line_fit=line_fit, ) self.logger.info( f"{focus_type.capitalize()} focus plot for {self._camera} written to {plot_path}" ) self.logger.debug(f"Autofocus of {self._camera} complete - final focus position: {final_focus}") if focus_event: focus_event.set() return initial_focus, final_focus def _set_autofocus_parameters( self, autofocus_range, autofocus_step, autofocus_seconds, autofocus_size, autofocus_keep_files, autofocus_take_dark, autofocus_merit_function, autofocus_merit_function_kwargs, autofocus_mask_dilations, autofocus_make_plots, ): # Moved to a separate private method to make it possible to override. if autofocus_range: self.autofocus_range = (int(autofocus_range[0]), int(autofocus_range[1])) else: self.autofocus_range = None if autofocus_step: self.autofocus_step = (int(autofocus_step[0]), int(autofocus_step[1])) else: self.autofocus_step = None self.autofocus_seconds = autofocus_seconds self.autofocus_size = autofocus_size self.autofocus_keep_files = autofocus_keep_files self.autofocus_take_dark = autofocus_take_dark self.autofocus_merit_function = autofocus_merit_function self.autofocus_merit_function_kwargs = autofocus_merit_function_kwargs self.autofocus_mask_dilations = autofocus_mask_dilations self.autofocus_make_plots = bool(autofocus_make_plots) def _add_fits_keywords(self, header): header.set("FOC-NAME", self.name, "Focuser name") header.set("FOC-MOD", self.model, "Focuser model") header.set("FOC-ID", self.uid, "Focuser serial number") header.set("FOC-POS", self.position, "Focuser position") return header def __str__(self): try: s = f"{self.name} ({self.uid}) on {self.port}" except Exception: s = str(__class__) return s