import os
import random
from threading import Timer
import numpy as np
from astropy import units as u
from astropy.io import fits
from panoptes.pocs.camera import AbstractCamera
from panoptes.utils.images import fits as fits_utils
from panoptes.utils.time import CountdownTimer
from panoptes.utils.utils import get_quantity_value
[docs]
class Camera(AbstractCamera):
@property
def egain(self):
return 1
@property
def bit_depth(self):
return 12 * u.bit
def __init__(self, name='Simulated Camera', *args, **kwargs):
kwargs['timeout'] = kwargs.get('timeout', 1.5 * u.second)
kwargs['readout_time'] = kwargs.get('readout_time', 1.0 * u.second)
super().__init__(name=name, *args, **kwargs)
self.connect()
self.logger.info(f"{self} initialised")
[docs]
def connect(self):
""" Connect to camera simulator
The simulator merely marks the `connected` property.
"""
# Create a random serial number if one hasn't been specified
if self._serial_number == 'XXXXXX':
self._serial_number = 'SC{:04d}'.format(random.randint(0, 9999))
self._connected = True
self.logger.debug(f'{self.name} connected')
[docs]
def take_observation(self, observation, headers=None, filename=None, *args, **kwargs):
exptime = kwargs.get('exptime', observation.exptime.value)
if exptime > 1:
kwargs['exptime'] = 1
self.logger.debug("Trimming camera simulator exposure to 1 s")
return super().take_observation(observation,
headers,
filename,
**kwargs)
def _end_exposure(self):
self._is_exposing_event.clear()
def _start_exposure(self, seconds=None, filename=None, dark=False, header=None, *args,
**kwargs):
self._is_exposing_event.set()
seconds = kwargs.get('simulator_exptime', seconds)
exposure_thread = Timer(interval=get_quantity_value(seconds, unit=u.second),
function=self._end_exposure)
exposure_thread.start()
readout_args = (filename, header)
return readout_args
def _readout(self, filename=None, header=None):
self.logger.debug(f'Calling _readout for {self}')
timer = CountdownTimer(duration=self.readout_time, name='ReadoutDSLR')
# Get example FITS file from test data directory
file_path = os.path.join('.', 'tests', 'data', 'unsolved.fits')
fake_data = fits.getdata(file_path)
if header.get('IMAGETYP') == 'Dark Frame':
# Replace example data with a bunch of random numbers
fake_data = np.random.randint(low=975, high=1026,
size=fake_data.shape,
dtype=fake_data.dtype)
self.logger.debug(f'Writing filename={filename!r} for {self}')
self.write_fits(fake_data, header, filename)
self.logger.debug(f'Finished writing {filename=}')
# Sleep for the remainder of the readout time.
timer.sleep()
def _do_process_exposure(self, file_path, metadata):
file_path = super()._do_process_exposure(file_path, metadata)
self.logger.debug('Overriding mount coordinates for camera simulator')
# TODO get the path as package data or something better.
solved_path = os.path.join('.', 'tests', 'data', 'solved.fits.fz')
solved_header = fits_utils.getheader(solved_path)
with fits.open(file_path, 'update') as f:
hdu = f[0]
hdu.header.set('RA-MNT', solved_header['RA-MNT'], 'Degrees')
hdu.header.set('HA-MNT', solved_header['HA-MNT'], 'Degrees')
hdu.header.set('DEC-MNT', solved_header['DEC-MNT'], 'Degrees')
self.logger.debug("Headers updated for simulated image.")
return file_path
def _set_target_temperature(self, target):
raise False
def _set_cooling_enabled(self, enable):
raise False