"""ZWO ASI camera implementation using the ASICamera2 SDK.
Exposes a Camera class that wraps the ASIDriver to control cooled ZWO cameras,
including ROI/image type, binning, gain, bandwidth, cooling, single exposures,
and basic video capture.
"""
import threading
import time
from contextlib import suppress
import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.time import Time
from panoptes.utils import error
from panoptes.utils.utils import get_quantity_value
from panoptes.pocs.camera.libasi import ASIDriver
from panoptes.pocs.camera.sdk import AbstractSDKCamera
[docs]
class Camera(AbstractSDKCamera):
"""ZWO ASI camera controlled via the ASICamera2 SDK.
Provides convenience properties for ROI, binning, image type, gain, and
bandwidth settings, and supports single exposures and basic video capture.
"""
_driver = None # Class variable to store the ASI driver interface
_cameras = [] # Cache of camera string IDs
_assigned_cameras = set() # Camera string IDs already in use.
def __init__(
self,
name: str = "ZWO ASI Camera",
gain: int | None = 120,
image_type: str | None = "RAW16",
bandwidthoverload: float = 50,
binning: int = 2,
fix_bit_padding: bool = False,
*args,
**kwargs,
):
"""
ZWO ASI Camera class
Args:
name (str): camera serial number or user set ID (up to 8 bytes). See notes.
gain (int, optional): gain setting, using camera's internal units. If not given
the camera will use its current or default setting.
image_type (str, optional): image format to use (one of 'RAW8', 'RAW16', 'RGB24'
or 'Y8'). Default is to use 'RAW16' if supported by the camera, otherwise
the camera's own default will be used.
bandwidthoverload (int, optional): bandwidth overload setting in percent,
default is 99.
binning (int, optional): binning factor to use for the camera, default is 2,
which is quad binning.
fix_bit_padding (bool, optional): whether to right-shift RAW16 data to convert
from zero-padded LSBs to zero-padded MSBs (native 12-bit). Default is False.
*args, **kwargs: additional arguments to be passed to the parent classes.
Notes:
ZWO ASI cameras don't have a 'port', they only have a non-deterministic integer
camera_ID and, probably, an 8 byte serial number. Optionally they also have an
8 byte ID that can be written to the camera firmware by the user (using ASICap,
or pocs.camera.libasi.ASIDriver.set_ID()). The camera should be identified by
its serial number or, if it doesn't have one, by the user set ID.
"""
kwargs["readout_time"] = kwargs.get("readout_time", 0.1)
kwargs["timeout"] = kwargs.get("timeout", 5)
# ZWO cameras cannot take internal darks (not even supported in the API yet).
kwargs["internal_darks"] = kwargs.get("internal_darks", False)
self._video_event = threading.Event()
self._fix_bit_padding = fix_bit_padding
super().__init__(name, ASIDriver, *args, **kwargs)
# Increase default temperature_tolerance for ZWO cameras because the
# default value is too low for their temperature resolution.
self.temperature_tolerance = kwargs.get("temperature_tolerance", 0.6 * u.Celsius)
if gain:
self.gain = gain
if bandwidthoverload is not None:
self.bandwidthoverload = bandwidthoverload
if image_type:
self.image_type = image_type
else:
# Take monochrome 12 bit raw images by default, if we can
if "RAW16" in self.properties["supported_video_format"]:
self.image_type = "RAW16"
if binning is not None:
self.binning = binning
self.logger.info(f"{self} initialised")
def __del__(self):
"""Attempt some clean up"""
with suppress(AttributeError, TypeError):
camera_ID = self._handle
self._driver.close_camera(camera_ID)
self.logger.debug(f"Closed ZWO camera {camera_ID}")
super().__del__()
# Properties
@property
def roi(self) -> dict:
"""Get the ROI of the camera, which includes the width, height, binning, and image_type."""
roi_format = self._driver.get_roi_format(self._handle)
return roi_format
@property
def image_type(self) -> str:
"""Current camera image type, one of 'RAW8', 'RAW16', 'Y8', 'RGB24'"""
return self.roi.get("image_type")
@image_type.setter
def image_type(self, new_image_type: str):
"""Set the camera image type.
Args:
new_image_type (str): One of 'RAW8', 'RAW16', 'RGB24', or 'Y8'.
Raises:
ValueError: If the requested type is not supported by this camera.
"""
if new_image_type not in self.properties["supported_video_format"]:
msg = f"Image type '{new_image_type} not supported by {self.model}"
self.logger.error(msg)
raise ValueError(msg)
roi_format = self.roi
roi_format["image_type"] = new_image_type
self._driver.set_roi_format(self._handle, **roi_format)
@property
def binning(self) -> int:
"""Current camera binning setting, either `1` (no binning) or `2` (binning)."""
return self.roi.get("binning")
@binning.setter
def binning(self, new_binning: int):
"""Set camera binning (1 or 2) and adjust ROI accordingly.
Args:
new_binning (int): Desired binning factor. Must be supported by the camera.
Raises:
ValueError: If the requested binning is not supported.
"""
if new_binning not in self.properties["supported_bins"]:
msg = f"Binning '{new_binning}' not supported by {self.model}"
self.logger.error(msg)
raise ValueError(msg)
roi_format = self.roi
roi_format["binning"] = new_binning
roi_format["width"] = roi_format["width"].to_value() // new_binning
roi_format["height"] = roi_format["height"].to_value() // new_binning
self.logger.debug(f"Setting binning to {new_binning}")
try:
self._driver.set_roi_format(self._handle, **roi_format)
except Exception as e:
self.logger.error(f"Failed to set binning '{new_binning}': {e}")
@property
def image_size(self) -> tuple[u.Quantity, u.Quantity]:
"""Current camera image size, either `(width, height)`."""
width = self.roi.get("width")
height = self.roi.get("height")
return width, height
@property
def width(self) -> u.Quantity:
"""Current image width"""
return self.image_size[0]
@property
def height(self) -> u.Quantity:
"""Current image height"""
return self.image_size[1]
@property
def bit_depth(self):
"""ADC bit depth"""
return self.properties["bit_depth"]
@property
def temperature(self):
"""Current temperature of the camera's image sensor"""
return self._control_getter("TEMPERATURE")[0]
@AbstractSDKCamera.target_temperature.getter
def target_temperature(self):
"""Current value of the target temperature for the camera's image sensor cooling control.
Can be set by assigning an astropy.units.Quantity
"""
return self._control_getter("TARGET_TEMP")[0]
@AbstractSDKCamera.cooling_enabled.getter
def cooling_enabled(self):
"""Current status of the camera's image sensor cooling system (enabled/disabled)"""
return self._control_getter("COOLER_ON")[0]
@property
def cooling_power(self):
"""Current power level of the camera's image sensor cooling system (as a percentage)."""
return self._control_getter("COOLER_POWER_PERC")[0]
@property
def gain(self):
"""Current value of the camera's gain setting in internal units.
See `egain` for the corresponding electrons / ADU value.
"""
return self._control_getter("GAIN")[0]
@gain.setter
def gain(self, gain):
"""Set the camera's internal gain value and refresh derived properties.
Args:
gain (int): Gain value in the camera's native units.
"""
self._control_setter("GAIN", gain)
self._refresh_info() # This will update egain value in self.properties
@property
def egain(self):
"""Image sensor gain in e-/ADU for the current gain, as reported by the camera."""
return self.properties["e_per_adu"]
@property
def is_exposing(self):
"""True if an exposure is currently under way, otherwise False"""
return self._driver.get_exposure_status(self._handle) == "WORKING"
@property
def bandwidthoverload(self):
"""USB bandwidth usage limit as a percentage.
Returns:
int | float: The current bandwidth overload percentage.
"""
return self._control_getter("BANDWIDTHOVERLOAD")[0]
@bandwidthoverload.setter
def bandwidthoverload(self, value):
"""Set the USB bandwidth overload percentage.
Args:
value (int | float | Quantity): Percentage (0–100). Quantities
with units of percent are accepted.
"""
value = get_quantity_value(value, u.percent) * u.percent
self._control_setter("BANDWIDTHOVERLOAD", value)
# Methods
[docs]
def connect(self, enable_cooling=False):
"""
Connect to ZWO ASI camera.
Gets 'camera_ID' (needed for all driver commands), camera properties and details
of available camera commands/parameters.
"""
self.logger.debug(f"Connecting to {self}")
self._refresh_info()
self._handle = self.properties["camera_ID"]
self.model, _, _ = self.properties["name"].partition("(")
if self.properties["has_cooler"]:
self._is_cooled_camera = enable_cooling
if self.properties["is_color_camera"]:
self._filter_type = self.properties["bayer_pattern"]
else:
self._filter_type = "M" # Monochrome
self._driver.open_camera(self._handle)
self._driver.init_camera(self._handle)
self._control_info = self._driver.get_control_caps(self._handle)
self._info["control_info"] = self._control_info # control info accessible via properties
self._driver.disable_dark_subtract(self._handle)
self._connected = True
[docs]
def take_exposure(
self,
seconds=1.0 * u.second,
filename=None,
metadata=None,
dark=False,
blocking=False,
timeout=10 * u.second,
*args,
**kwargs,
) -> threading.Thread:
"""Take an exposure, clipping exposure time to camera's valid range.
Ensures the exposure time is within the camera's valid range before
taking the exposure. This prevents issues where setting an exposure time
below the minimum (e.g., zero) would not be properly handled.
Args:
seconds: Length of exposure (will be clipped to valid range)
filename: Image filename
metadata: FITS header metadata
dark: Whether this is a dark frame
blocking: Whether to block until exposure completes
timeout: Additional timeout beyond exposure + readout time
*args, **kwargs: Additional arguments passed to parent class
Returns:
threading.Thread: The readout thread
"""
# Ensure seconds is a Quantity
if not isinstance(seconds, u.Quantity):
seconds = seconds * u.second
# Clip exposure time to valid range before proceeding
exposure_info = self._control_info.get("EXPOSURE")
if exposure_info:
min_exposure = exposure_info["min_value"]
max_exposure = exposure_info["max_value"]
if seconds < min_exposure:
self.logger.debug(
f"Exposure time {seconds} is below minimum {min_exposure}, clipping to minimum"
)
seconds = min_exposure
elif seconds > max_exposure:
self.logger.debug(
f"Exposure time {seconds} is above maximum {max_exposure}, clipping to maximum"
)
seconds = max_exposure
# Call parent class with clipped exposure time
return super().take_exposure(
seconds=seconds,
filename=filename,
metadata=metadata,
dark=dark,
blocking=blocking,
timeout=timeout,
*args,
**kwargs,
)
[docs]
def start_video(self, seconds, filename_root, max_frames, image_type=None):
"""Start video capture and write frames to FITS files.
Args:
seconds (float | Quantity): Exposure time per frame.
filename_root (str): Prefix for output filenames (frame number appended).
max_frames (int): Maximum number of frames to capture before stopping.
image_type (str | None): Optional image type override (e.g., 'RAW16').
"""
if not isinstance(seconds, u.Quantity):
seconds = seconds * u.second
self._control_setter("EXPOSURE", seconds)
if image_type:
self.image_type = image_type
roi_format = self._driver.get_roi_format(self._handle)
width = int(get_quantity_value(roi_format["width"], unit=u.pixel))
height = int(get_quantity_value(roi_format["height"], unit=u.pixel))
image_type = roi_format["image_type"]
timeout = 2 * seconds + self.timeout * u.second
video_args = (
width,
height,
image_type,
timeout,
filename_root,
self.file_extension,
int(max_frames),
self._create_fits_header(seconds, dark=False),
)
video_thread = threading.Thread(target=self._video_readout, args=video_args, daemon=True)
self._driver.start_video_capture(self._handle)
self._video_event.clear()
video_thread.start()
self.logger.debug(f"Video capture started on {self}")
[docs]
def stop_video(self):
"""Stop video capture and signal the reader thread to finish."""
self._video_event.set()
self._driver.stop_video_capture(self._handle)
self.logger.debug(f"Video capture stopped on {self}")
# Private methods
def _set_target_temperature(self, target):
self._control_setter("TARGET_TEMP", target)
self._target_temperature = target
def _set_cooling_enabled(self, enable):
self._control_setter("COOLER_ON", enable)
def _video_readout(
self, width, height, image_type, timeout, filename_root, file_extension, max_frames, header
):
start_time = time.monotonic()
good_frames = 0
bad_frames = 0
# Calculate number of bits that have been used to pad the raw data to RAW16 format.
if self.image_type == "RAW16":
pad_bits = 16 - int(get_quantity_value(self.bit_depth, u.bit))
else:
pad_bits = 0
for frame_number in range(max_frames):
if self._video_event.is_set():
break
# This call will block for up to timeout milliseconds waiting for a frame
video_data = self._driver.get_video_data(self._handle, width, height, image_type, timeout)
if video_data is not None:
now = Time.now()
header.set("DATE-OBS", now.fits, "End of exposure + readout")
filename = f"{filename_root}_{frame_number:06d}.{file_extension}"
# Fix 'raw' data scaling by changing from zero padding of LSBs
# to zero padding of MSBs.
if self._fix_bit_padding:
video_data = np.right_shift(video_data, pad_bits)
self.write_fits(video_data, header, filename)
good_frames += 1
else:
bad_frames += 1
if frame_number == max_frames - 1:
# No one callled stop_video() before max_frames so have to call it here
self.stop_video()
elapsed_time = (time.monotonic() - start_time) * u.second
self.logger.debug(
f"Captured {good_frames} of {max_frames} frames in {elapsed_time:.2f} "
f"({get_quantity_value(good_frames / elapsed_time):.2f} fps), "
f"{bad_frames} frames lost"
)
def _start_exposure(self, seconds=None, filename=None, dark=False, header=None, *args, **kwargs):
self._control_setter("EXPOSURE", seconds)
roi_format = self._driver.get_roi_format(self._handle)
self._driver.start_exposure(self._handle)
readout_args = (filename, roi_format["width"], roi_format["height"], header)
return readout_args
def _readout(self, filename, width, height, header):
exposure_status = self._driver.get_exposure_status(self._handle)
if exposure_status == "SUCCESS":
try:
image_data = self._driver.get_exposure_data(self._handle, width, height, self.image_type)
except RuntimeError as err:
raise error.PanError(f"Error getting image data from {self}: {err}")
else:
# Fix 'raw' data scaling by changing from zero padding of LSBs
# to zero padding of MSBs.
if self._fix_bit_padding and self.image_type == "RAW16":
pad_bits = 16 - int(get_quantity_value(self.bit_depth, u.bit))
image_data = np.right_shift(image_data, pad_bits)
self.write_fits(data=image_data, header=header, filename=filename)
elif exposure_status == "FAILED":
raise error.PanError(f"Exposure failed on {self}")
elif exposure_status == "IDLE":
raise error.PanError(f"Exposure missing on {self}")
else:
raise error.PanError(f"Unexpected exposure status on {self}: '{exposure_status}'")
def _create_fits_header(self, seconds, dark=None, metadata=None) -> fits.Header:
header = super()._create_fits_header(seconds, dark)
header.set("CAM-GAIN", self.gain, "Internal units")
header.set(
"XPIXSZ",
get_quantity_value(self.properties["pixel_size"] * self.binning, u.um),
"Microns",
)
header.set(
"YPIXSZ",
get_quantity_value(self.properties["pixel_size"] * self.binning, u.um),
"Microns",
)
return header
def _refresh_info(self):
self._info = self._driver.get_camera_property(self._address)
def _control_getter(self, control_type):
if control_type in self._control_info:
return self._driver.get_control_value(self._handle, control_type)
else:
raise error.NotSupported(f"{self.model} has no '{control_type}' parameter")
def _control_setter(self, control_type, value):
if control_type not in self._control_info:
raise error.NotSupported(f"{self.model} has no '{control_type}' parameter")
control_name = self._control_info[control_type]["name"]
if not self._control_info[control_type]["is_writable"]:
raise error.NotSupported(f"{self.model} cannot set {control_name} parameter'")
if value != "AUTO":
# Check limits.
max_value = self._control_info[control_type]["max_value"]
if value > max_value:
self.logger.warning(
f"Cannot set {control_name} to {value}, clipping to max value: {max_value}."
)
self._driver.set_control_value(self._handle, control_type, max_value)
return
min_value = self._control_info[control_type]["min_value"]
if value < min_value:
self.logger.warning(
f"Cannot set {control_name} to {value}, clipping to min value: {min_value}."
)
self._driver.set_control_value(self._handle, control_type, min_value)
return
else:
if not self._control_info[control_type]["is_auto_supported"]:
raise error.IllegalValue(f"{self.model} cannot set {control_name} to AUTO")
self._driver.set_control_value(self._handle, control_type, value)