Source code for panoptes.pocs.camera.gphoto.base

import re
import subprocess
import time
from abc import ABC
from typing import List, Dict, Union

from panoptes.utils import error
from panoptes.utils.images import cr2 as cr2_utils
from panoptes.utils.serializers import from_yaml
from panoptes.utils.utils import listify

from panoptes.pocs.camera import AbstractCamera, get_gphoto2_cmd

file_save_re = re.compile(r'Saving file as (.*)')


[docs] class AbstractGPhotoCamera(AbstractCamera, ABC): # pragma: no cover """ Abstract camera class that uses gphoto2 interaction. Args: config(Dict): Config key/value pairs, defaults to empty dict. """ def __init__(self, *arg, **kwargs): super().__init__(*arg, **kwargs) # Set up a holder for the exposure process. self._command_proc = None self.logger.info(f'GPhoto2 camera {self.name} created on {self.port}') @property def temperature(self): return None @property def target_temperature(self): return None @property def cooling_power(self): return None @AbstractCamera.uid.getter def uid(self) -> str: """ A six-digit serial number for the camera """ return self._serial_number[0:6]
[docs] def connect(self): raise NotImplementedError
@property def is_exposing(self): if self._command_proc is not None and self._command_proc.poll() is not None: self._is_exposing_event.clear() return self._is_exposing_event.is_set()
[docs] def process_exposure(self, metadata, **kwargs): """Converts the CR2 to FITS then processes image.""" # Wait for exposure to complete. Timeout handled by exposure thread. while self.is_exposing: time.sleep(0.5) metadata['filepath'] = metadata['filepath'].replace('.cr2', '.fits') super(AbstractGPhotoCamera, self).process_exposure(metadata, **kwargs) self._command_proc = None
[docs] def command(self, cmd: Union[List[str], str], check_exposing: bool = True): """ Run gphoto2 command. """ # Test to see if there is a running command already if self.is_exposing and check_exposing: raise error.InvalidCommand("Command already running") else: # Build the command. run_cmd = [get_gphoto2_cmd()] if self.port is not None: run_cmd.extend(['--port', self.port]) run_cmd.extend(listify(cmd)) self.logger.debug(f"gphoto2 command: {run_cmd!r}") try: self._command_proc = subprocess.Popen( run_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) self.logger.debug(f'Started command on proc={self._command_proc.pid}') except OSError as e: raise error.InvalidCommand(f"Can't send command to gphoto2. {e} \t {run_cmd}") except ValueError as e: raise error.InvalidCommand(f"Bad parameters to gphoto2. {e} \t {run_cmd}") except Exception as e: raise error.PanError(e)
[docs] def get_command_result(self, timeout: float = 10) -> Union[List[str], None]: """ Get the output from the command. Accepts a `timeout` param for communicating with the process. Returns a list of strings corresponding to the output from the gphoto2 camera or `None` if no command has been specified. """ if self._command_proc is None: return None self.logger.debug(f"Getting output from proc {self._command_proc.pid}") try: outs, errs = self._command_proc.communicate(timeout=timeout) except subprocess.TimeoutExpired: self.logger.debug(f"Timeout while waiting. Killing process {self._command_proc.pid}") self._command_proc.kill() outs, errs = self._command_proc.communicate() self.logger.trace(f'gphoto2 output: {outs=!r}') if errs != '': self.logger.warning(f'gphoto2 error: {errs!r}') if isinstance(outs, str): outs = outs.split('\n') self._command_proc = None return outs
[docs] def set_property(self, prop: str, val: Union[str, int], is_value: bool = False, is_index: bool = False): """ Set a property on the camera. Args: prop (str): The property to set. val (str, int): The value to set the property to. is_value (bool): If True, then the value is a literal value. Default False. is_index (bool): If True, then the value is an index. Default False. Raises: ValueError: If the property is not found. """ self.logger.debug(f'Setting {prop=} to {val=}') if is_index: set_cmd = ['--set-config-index', f'{prop}={val}'] elif is_value: set_cmd = ['--set-config-value', f'{prop}={val}'] else: set_cmd = ['--set-config', f'{prop}="{val}"'] self.command(set_cmd) # Forces the command to wait self.get_command_result()
[docs] def set_properties(self, prop2index: Dict[str, int] = None, prop2value: Dict[str, str] = None): """ Sets a number of properties all at once, by index or value. Args: prop2index (dict or None): A dict with keys corresponding to the property to be set and values corresponding to the index option. prop2value (dict or None): A dict with keys corresponding to the property to be set and values corresponding to the literal value. """ if prop2index: for prop, val in prop2index.items(): try: self.set_property(prop, val, is_index=True) except Exception: self.logger.debug(f'Skipping {prop=} {val=}') if prop2value: for prop, val in prop2value.items(): try: self.set_property(prop, val, is_value=True) except Exception: self.logger.debug(f'Skipping {prop=} {val=}')
[docs] def get_property(self, prop: str) -> str: """ Gets a property from the camera """ set_cmd = ['--get-config', f'{prop}'] self.command(set_cmd) result = self.get_command_result() output = '' for line in result: match = re.match(r'Current:\s*(.*)', line) if match: output = match.group(1) return output
[docs] def load_properties(self) -> dict: """ Load properties from the camera. Reads all the configuration properties available via gphoto2 and returns as dictionary. """ self.logger.debug('Getting all properties for gphoto2 camera') self.command(['--list-all-config']) lines = self.get_command_result() properties = {} yaml_string = '' for line in lines: is_id = len(line.split('/')) > 1 is_label = re.match(r'^Label:\s*(.*)', line) is_type = re.match(r'^Type:\s*(.*)', line) is_readonly = re.match(r'^Readonly:\s*(.*)', line) is_current = re.match(r'^Current:\s*(.*)', line) is_choice = re.match(r'^Choice:\s*(\d+)\s*(.*)', line) is_printable = re.match(r'^Printable:\s*(.*)', line) is_help = re.match(r'^Help:\s*(.*)', line) if is_label or is_type or is_current or is_readonly: line = f' {line}' elif is_choice: if int(is_choice.group(1)) == 0: line = f' Choices:\n {int(is_choice.group(1)):d}: {is_choice.group(2)}' else: line = f' {int(is_choice.group(1)):d}: {is_choice.group(2)}' elif is_printable: line = f' {line}' elif is_help: line = f' {line}' elif is_id: line = f'- ID: {line}' elif line == '' or line == 'END': continue else: self.logger.debug(f'Line not parsed: {line}') yaml_string += f'{line}\n' self.logger.debug(yaml_string) properties_list = from_yaml(yaml_string) if isinstance(properties_list, list): for prop in properties_list: if prop['Label']: properties[prop['Label']] = prop else: properties = properties_list return properties
def _poll_exposure(self, readout_args, exposure_time, timeout=None, interval=0.01): """Check the command output from gphoto2 for polling. This will essentially block until the camera is done exposing, which means the super call should not have to wait. """ self.logger.debug(f'Calling get_command_result from base gphoto2 for {self}') # Wait for and clear the _command_proc. outs = self.get_command_result(timeout) self.logger.debug(f'Camera response for {self}: {outs}') self._is_exposing_event.clear() self.logger.debug(f'Exposing event cleared for {self}') super()._poll_exposure(readout_args, exposure_time, timeout=timeout, interval=interval) def _readout(self, filename, headers, *args, **kwargs): self.logger.debug(f'Reading Canon DSLR exposure for {filename=}') try: self.logger.debug(f"Converting CR2 -> FITS: {filename}") fits_path = cr2_utils.cr2_to_fits(filename, headers=headers, remove_cr2=False) except TimeoutError: self.logger.error(f'Error processing exposure for {filename} on {self}') finally: self._readout_complete = True def _set_target_temperature(self, target): return None def _set_cooling_enabled(self, enable): return None
[docs] @classmethod def start_tether(cls, port, filename_pattern: str = '%Y%m%dT%H%M%S.%C'): """Start a tether for gphoto2 auto-download on given port using filename pattern.""" print(f'Starting gphoto2 tether for {port=} using {filename_pattern=}') full_command = [get_gphoto2_cmd(), '--port', port, '--filename', filename_pattern, '--capture-tethered'] # Start tether process. process = subprocess.Popen( full_command, stderr=subprocess.STDOUT, stdout=subprocess.PIPE ) print(f'gphoto2 tether started on {port=} on {process.pid=}') try: process.wait() except KeyboardInterrupt: print(f'Stopping tether on {port=}')
[docs] @classmethod def gphoto_file_download( cls, port: str, filename_pattern: str, only_new: bool = True ): """Downloads (newer) files from the camera on the given port using the filename pattern.""" print(f'Starting gphoto2 download for {port=} using {filename_pattern=}') command = [get_gphoto2_cmd(), '--port', port, '--filename', filename_pattern, '--get-all-files', '--recurse'] if only_new: command.append('--new') completed_proc = subprocess.run(command, capture_output=True) success = completed_proc.returncode >= 0 filenames = list() if success: output = completed_proc.stdout.decode('utf-8').split('\n') for line in output: file_match = file_save_re.match(line) if file_match is not None: fn = file_match.group(1).strip() print(f'Found match {fn}') filenames.append(fn) return filenames