Source code for panoptes.pocs.camera.simulator.dslr

"""DSLR-style simulated camera driver.

Provides a lightweight Camera implementation that mimics a DSLR controlled via
simple timers and sample FITS data, suitable for tests and demos.
"""

import os
import random
from threading import Timer

import numpy as np
from astropy import units as u
from astropy.io import fits

from panoptes.utils.images import fits as fits_utils
from panoptes.utils.time import CountdownTimer
from panoptes.utils.utils import get_quantity_value

from panoptes.pocs.camera import AbstractCamera


[docs] class Camera(AbstractCamera): """Simulated DSLR-like camera using timers and canned FITS data.""" @property def egain(self): """Estimated sensor gain (e-/ADU) used for testing. Returns: int | float: Constant gain value for the simulator (unitless). """ return 1 @property def bit_depth(self): """ADC bit depth for the simulated sensor. Returns: astropy.units.Quantity: Bit depth in bits. """ return 12 * u.bit def __init__(self, name="Simulated Camera", *args, **kwargs): """Create the simulated camera and connect immediately. Args: name (str): Friendly name for the simulated camera. *args: Forwarded to AbstractCamera. **kwargs: Forwarded to AbstractCamera; supports timeout and readout_time. """ kwargs["timeout"] = kwargs.get("timeout", 1.5 * u.second) kwargs["readout_time"] = kwargs.get("readout_time", 1.0 * u.second) super().__init__(name=name, *args, **kwargs) # Create a random serial number if one hasn't been specified if self._serial_number == "XXXXXX": self._serial_number = f"SC{random.randint(0, 9999):04d}" self.connect() self.logger.debug(f"{self.name} connected") self.setup_camera() self.logger.info(f"{self} initialised")
[docs] def connect(self): """Connect to the camera simulator. This is a no-op for the simulator. """ self.logger.debug(f"Connecting to camera simulator {self.name}") self._connected = True
[docs] def setup_camera(self): """Set up the camera simulator. This is a no-op for the simulator. """ self.logger.debug(f"Setting up camera simulator {self.name}") pass
[docs] def take_observation(self, observation, headers=None, filename=None, *args, **kwargs): """Start a simulated observation, trimming long exposures. For the simulator, any requested exposure longer than 1 second is reduced to 1 second to keep tests fast. Args: observation: The Observation describing the target/sequence. headers: Optional FITS header metadata to include. filename: Optional filename stem to use for the output. *args: Forwarded to AbstractCamera.take_observation. **kwargs: May include 'exptime' to set the exposure duration. Returns: dict: The metadata returned by AbstractCamera.take_observation. """ exptime = kwargs.get("exptime", observation.exptime.value) if exptime > 1: kwargs["exptime"] = 1 self.logger.debug("Trimming camera simulator exposure to 1 s") return super().take_observation(observation, headers, filename, **kwargs)
def _end_exposure(self): self._is_exposing_event.clear() def _start_exposure(self, seconds=None, filename=None, dark=False, header=None, *args, **kwargs): self._is_exposing_event.set() seconds = kwargs.get("simulator_exptime", seconds) exposure_thread = Timer( interval=get_quantity_value(seconds, unit=u.second), function=self._end_exposure ) exposure_thread.start() readout_args = (filename, header) return readout_args def _readout(self, filename=None, header=None): self.logger.debug(f"Calling _readout for {self}") timer = CountdownTimer(duration=self.readout_time, name="ReadoutDSLR") # Get example FITS file from test data directory file_path = os.path.join(".", "tests", "data", "unsolved.fits") fake_data = fits.getdata(file_path) if header.get("IMAGETYP") == "Dark Frame": # Replace example data with a bunch of random numbers fake_data = np.random.randint(low=975, high=1026, size=fake_data.shape, dtype=fake_data.dtype) self.logger.debug(f"Writing filename={filename!r} for {self}") self.write_fits(fake_data, header, filename) self.logger.debug(f"Finished writing {filename=}") # Sleep for the remainder of the readout time. timer.sleep() def _do_process_exposure(self, file_path, metadata): file_path = super()._do_process_exposure(file_path, metadata) self.logger.debug("Overriding mount coordinates for camera simulator") # TODO get the path as package data or something better. solved_path = os.path.join(".", "tests", "data", "solved.fits.fz") solved_header = fits_utils.getheader(solved_path) with fits.open(file_path, "update") as f: hdu = f[0] hdu.header.set("RA-MNT", solved_header["RA-MNT"], "Degrees") hdu.header.set("HA-MNT", solved_header["HA-MNT"], "Degrees") hdu.header.set("DEC-MNT", solved_header["DEC-MNT"], "Degrees") self.logger.debug("Headers updated for simulated image.") return file_path def _set_target_temperature(self, target): raise False def _set_cooling_enabled(self, enable): raise False