Source code for panoptes.pocs.camera.zwo

import threading
import time
from contextlib import suppress

import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.time import Time
from panoptes.pocs.camera.libasi import ASIDriver
from panoptes.pocs.camera.sdk import AbstractSDKCamera
from panoptes.utils import error
from panoptes.utils.images import fits as fits_utils
from panoptes.utils.utils import get_quantity_value


[docs] class Camera(AbstractSDKCamera): _driver = None # Class variable to store the ASI driver interface _cameras = [] # Cache of camera string IDs _assigned_cameras = set() # Camera string IDs already in use. def __init__(self, name='ZWO ASI Camera', gain=100, image_type=None, *args, **kwargs): """ ZWO ASI Camera class Args: serial_number (str): camera serial number or user set ID (up to 8 bytes). See notes. gain (int, optional): gain setting, using camera's internal units. If not given the camera will use its current or default setting. image_type (str, optional): image format to use (one of 'RAW8', 'RAW16', 'RGB24' or 'Y8'). Default is to use 'RAW16' if supported by the camera, otherwise the camera's own default will be used. *args, **kwargs: additional arguments to be passed to the parent classes. Notes: ZWO ASI cameras don't have a 'port', they only have a non-deterministic integer camera_ID and, probably, an 8 byte serial number. Optionally they also have an 8 byte ID that can be written to the camera firmware by the user (using ASICap, or pocs.camera.libasi.ASIDriver.set_ID()). The camera should be identified by its serial number or, if it doesn't have one, by the user set ID. """ kwargs['readout_time'] = kwargs.get('readout_time', 0.1) kwargs['timeout'] = kwargs.get('timeout', 5) # ZWO cameras cannot take internal darks (not even supported in the API yet). kwargs['internal_darks'] = kwargs.get('internal_darks', False) self._video_event = threading.Event() super().__init__(name, ASIDriver, *args, **kwargs) # Increase default temperature_tolerance for ZWO cameras because the # default value is too low for their temperature resolution. self.temperature_tolerance = kwargs.get('temperature_tolerance', 0.6 * u.Celsius) if gain: self.gain = gain if image_type: self.image_type = image_type else: # Take monochrome 12 bit raw images by default, if we can if 'RAW16' in self.properties['supported_video_format']: self.image_type = 'RAW16' self.logger.info('{} initialised'.format(self)) def __del__(self): """ Attempt some clean up """ with suppress(AttributeError, TypeError): camera_ID = self._handle self._driver.close_camera(camera_ID) self.logger.debug(f'Closed ZWO camera {camera_ID}') super().__del__() # Properties @property def image_type(self): """ Current camera image type, one of 'RAW8', 'RAW16', 'Y8', 'RGB24' """ roi_format = self._driver.get_roi_format(self._handle) return roi_format['image_type'] @image_type.setter def image_type(self, new_image_type): if new_image_type not in self.properties['supported_video_format']: msg = "Image type '{} not supported by {}".format(new_image_type, self.model) self.logger.error(msg) raise ValueError(msg) roi_format = self._driver.get_roi_format(self._handle) roi_format['image_type'] = new_image_type self._driver.set_roi_format(self._handle, **roi_format) @property def bit_depth(self): """ADC bit depth""" return self.properties['bit_depth'] @property def temperature(self): """ Current temperature of the camera's image sensor """ return self._control_getter('TEMPERATURE')[0] @AbstractSDKCamera.target_temperature.getter def target_temperature(self): """ Current value of the target temperature for the camera's image sensor cooling control. Can be set by assigning an astropy.units.Quantity """ return self._control_getter('TARGET_TEMP')[0] @AbstractSDKCamera.cooling_enabled.getter def cooling_enabled(self): """ Current status of the camera's image sensor cooling system (enabled/disabled) """ return self._control_getter('COOLER_ON')[0] @property def cooling_power(self): """ Current power level of the camera's image sensor cooling system (as a percentage). """ return self._control_getter('COOLER_POWER_PERC')[0] @property def gain(self): """ Current value of the camera's gain setting in internal units. See `egain` for the corresponding electrons / ADU value. """ return self._control_getter('GAIN')[0] @gain.setter def gain(self, gain): self._control_setter('GAIN', gain) self._refresh_info() # This will update egain value in self.properties @property def egain(self): """ Image sensor gain in e-/ADU for the current gain, as reported by the camera.""" return self.properties['e_per_adu'] @property def is_exposing(self): """ True if an exposure is currently under way, otherwise False """ return self._driver.get_exposure_status(self._handle) == "WORKING" # Methods
[docs] def connect(self, enable_cooling=False): """ Connect to ZWO ASI camera. Gets 'camera_ID' (needed for all driver commands), camera properties and details of available camera commands/parameters. """ self.logger.debug("Connecting to {}".format(self)) self._refresh_info() self._handle = self.properties['camera_ID'] self.model, _, _ = self.properties['name'].partition('(') if self.properties['has_cooler']: self._is_cooled_camera = enable_cooling if self.properties['is_color_camera']: self._filter_type = self.properties['bayer_pattern'] else: self._filter_type = 'M' # Monochrome self._driver.open_camera(self._handle) self._driver.init_camera(self._handle) self._control_info = self._driver.get_control_caps(self._handle) self._info['control_info'] = self._control_info # control info accessible via properties self._driver.disable_dark_subtract(self._handle) self._connected = True
[docs] def start_video(self, seconds, filename_root, max_frames, image_type=None): if not isinstance(seconds, u.Quantity): seconds = seconds * u.second self._control_setter('EXPOSURE', seconds) if image_type: self.image_type = image_type roi_format = self._driver.get_roi_format(self._handle) width = int(get_quantity_value(roi_format['width'], unit=u.pixel)) height = int(get_quantity_value(roi_format['height'], unit=u.pixel)) image_type = roi_format['image_type'] timeout = 2 * seconds + self.timeout * u.second video_args = (width, height, image_type, timeout, filename_root, self.file_extension, int(max_frames), self._create_fits_header(seconds, dark=False)) video_thread = threading.Thread(target=self._video_readout, args=video_args, daemon=True) self._driver.start_video_capture(self._handle) self._video_event.clear() video_thread.start() self.logger.debug("Video capture started on {}".format(self))
[docs] def stop_video(self): self._video_event.set() self._driver.stop_video_capture(self._handle) self.logger.debug("Video capture stopped on {}".format(self))
# Private methods def _set_target_temperature(self, target): self._control_setter('TARGET_TEMP', target) self._target_temperature = target def _set_cooling_enabled(self, enable): self._control_setter('COOLER_ON', enable) def _video_readout(self, width, height, image_type, timeout, filename_root, file_extension, max_frames, header): start_time = time.monotonic() good_frames = 0 bad_frames = 0 # Calculate number of bits that have been used to pad the raw data to RAW16 format. if self.image_type == 'RAW16': pad_bits = 16 - int(get_quantity_value(self.bit_depth, u.bit)) else: pad_bits = 0 for frame_number in range(max_frames): if self._video_event.is_set(): break # This call will block for up to timeout milliseconds waiting for a frame video_data = self._driver.get_video_data(self._handle, width, height, image_type, timeout) if video_data is not None: now = Time.now() header.set('DATE-OBS', now.fits, 'End of exposure + readout') filename = f'{filename_root}_{frame_number:06d}.{file_extension}' # Fix 'raw' data scaling by changing from zero padding of LSBs # to zero padding of MSBs. video_data = np.right_shift(video_data, pad_bits) self.write_fits(video_data, header, filename) good_frames += 1 else: bad_frames += 1 if frame_number == max_frames - 1: # No one callled stop_video() before max_frames so have to call it here self.stop_video() elapsed_time = (time.monotonic() - start_time) * u.second self.logger.debug("Captured {} of {} frames in {:.2f} ({:.2f} fps), {} frames lost".format( good_frames, max_frames, elapsed_time, get_quantity_value(good_frames / elapsed_time), bad_frames)) def _start_exposure(self, seconds=None, filename=None, dark=False, header=None, *args, **kwargs): self._control_setter('EXPOSURE', seconds) roi_format = self._driver.get_roi_format(self._handle) self._driver.start_exposure(self._handle) readout_args = (filename, roi_format['width'], roi_format['height'], header) return readout_args def _readout(self, filename, width, height, header): exposure_status = self._driver.get_exposure_status(self._handle) if exposure_status == 'SUCCESS': try: image_data = self._driver.get_exposure_data(self._handle, width, height, self.image_type) except RuntimeError as err: raise error.PanError(f'Error getting image data from {self}: {err}') else: # Fix 'raw' data scaling by changing from zero padding of LSBs # to zero padding of MSBs. if self.image_type == 'RAW16': pad_bits = 16 - int(get_quantity_value(self.bit_depth, u.bit)) image_data = np.right_shift(image_data, pad_bits) self.write_fits(data=image_data, header=header, filename=filename) elif exposure_status == 'FAILED': raise error.PanError(f"Exposure failed on {self}") elif exposure_status == 'IDLE': raise error.PanError(f"Exposure missing on {self}") else: raise error.PanError(f"Unexpected exposure status on {self}: '{exposure_status}'") def _create_fits_header(self, seconds, dark=None, metadata=None) -> fits.Header: header = super()._create_fits_header(seconds, dark) header.set('CAM-GAIN', self.gain, 'Internal units') header.set('XPIXSZ', get_quantity_value(self.properties['pixel_size'], u.um), 'Microns') header.set('YPIXSZ', get_quantity_value(self.properties['pixel_size'], u.um), 'Microns') return header def _refresh_info(self): self._info = self._driver.get_camera_property(self._address) def _control_getter(self, control_type): if control_type in self._control_info: return self._driver.get_control_value(self._handle, control_type) else: raise error.NotSupported("{} has no '{}' parameter".format(self.model, control_type)) def _control_setter(self, control_type, value): if control_type not in self._control_info: raise error.NotSupported("{} has no '{}' parameter".format(self.model, control_type)) control_name = self._control_info[control_type]['name'] if not self._control_info[control_type]['is_writable']: raise error.NotSupported("{} cannot set {} parameter'".format( self.model, control_name)) if value != 'AUTO': # Check limits. max_value = self._control_info[control_type]['max_value'] if value > max_value: self.logger.warning(f"Cannot set {control_name} to {value}, clipping to max value:" f" {max_value}.") self._driver.set_control_value(self._handle, control_type, max_value) return min_value = self._control_info[control_type]['min_value'] if value < min_value: self.logger.warning(f"Cannot set {control_name} to {value}, clipping to min value:" f" {min_value}.") self._driver.set_control_value(self._handle, control_type, min_value) return else: if not self._control_info[control_type]['is_auto_supported']: raise error.IllegalValue(f"{self.model} cannot set {control_name} to AUTO") self._driver.set_control_value(self._handle, control_type, value)