Source code for panoptes.pocs.camera.gphoto.base

"""Base classes and helpers for DSLR cameras controlled via gphoto2.

Provides AbstractGPhotoCamera, a concrete AbstractCamera subclass that shells out
to the system gphoto2 binary for exposure control and property management. This
module is used by gphoto-based DSLR drivers.
"""

import re
import subprocess
import time
from abc import ABC
from pathlib import Path

from panoptes.utils import error
from panoptes.utils.images import cr2 as cr2_utils
from panoptes.utils.serializers import from_yaml
from panoptes.utils.utils import listify

from panoptes.pocs.camera import AbstractCamera, get_gphoto2_cmd

file_save_re = re.compile(r"Saving file as (.*)")


[docs] class AbstractGPhotoCamera(AbstractCamera, ABC): # pragma: no cover """Abstract camera class that uses gphoto2 interaction. Args: config(Dict): Config key/value pairs, defaults to empty dict. """ def __init__(self, *arg, **kwargs): super().__init__(*arg, **kwargs) # Set up a holder for the exposure process. self._command_proc = None self.logger.info(f"GPhoto2 camera {self.name} created on {self.port}") @property def temperature(self): """Current sensor temperature if available (not typical for DSLRs). Returns: None: gphoto2-controlled DSLRs generally do not report sensor temperature. """ return None @property def target_temperature(self): """Target temperature for DSLR sensors (not applicable). Returns: None: DSLRs controlled via gphoto2 generally lack regulated cooling. """ return None @property def cooling_power(self): """Cooling power level for the camera, if available. Returns: None: DSLRs via gphoto2 typically do not expose cooling power metrics. """ return None @AbstractCamera.uid.getter def uid(self) -> str: """A six-digit serial number for the camera. Returns: str: The first six characters of the camera's serial number. """ return self._serial_number[0:6]
[docs] def connect(self): """Connect to the DSLR via gphoto2. Implementations should validate communication to the camera (e.g., by listing capabilities) and set internal flags required by AbstractCamera. Raises: NotImplementedError: This base class does not implement device-specific logic. """ raise NotImplementedError
@property def is_exposing(self): """Whether a gphoto2 command (exposure) is still running. Returns: bool: True if the current exposure subprocess is active. """ if self._command_proc is not None and self._command_proc.poll() is not None: self._is_exposing_event.clear() return self._is_exposing_event.is_set()
[docs] def process_exposure(self, metadata, **kwargs): """Convert the CR2 to FITS and pass to common processing. Args: metadata (dict): Header metadata saved for the image. **kwargs: Forwarded to AbstractCamera.process_exposure. Returns: None """ # Wait for exposure to complete. Timeout handled by exposure thread. while self.is_exposing: time.sleep(0.5) metadata["filepath"] = metadata["filepath"].replace(".cr2", ".fits") super().process_exposure(metadata, **kwargs) self._command_proc = None
[docs] def command(self, cmd: list[str] | str, check_exposing: bool = True): """Run a gphoto2 command and start tracking the subprocess. Args: cmd (list[str] | str): The gphoto2 arguments to run, e.g., ["--capture-image-and-download"] or a single string. check_exposing (bool): If True, prevent starting a new command while an exposure command is still running. Defaults to True. Raises: panoptes.utils.error.InvalidCommand: If a command is already in progress or parameters are invalid for gphoto2. panoptes.utils.error.PanError: For unexpected errors starting the subprocess. Returns: None """ # Test to see if there is a running command already if self.is_exposing and check_exposing: raise error.InvalidCommand("Command already running") else: # Build the command. run_cmd = [get_gphoto2_cmd()] if self.port is not None: run_cmd.extend(["--port", self.port]) run_cmd.extend(listify(cmd)) self.logger.debug(f"gphoto2 command: {run_cmd!r}") try: self._command_proc = subprocess.Popen( run_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) self.logger.debug(f"Started command on proc={self._command_proc.pid}") except OSError as e: raise error.InvalidCommand(f"Can't send command to gphoto2. {e} \t {run_cmd}") except ValueError as e: raise error.InvalidCommand(f"Bad parameters to gphoto2. {e} \t {run_cmd}") except Exception as e: raise error.PanError(e)
[docs] def get_command_result(self, timeout: float = 10) -> list[str] | None: """Retrieve stdout lines from the last gphoto2 subprocess. Args: timeout (float): Seconds to wait for the subprocess to finish before killing it and collecting output. Defaults to 10. Returns: list[str] | None: Lines of stdout from gphoto2, or None if no command has been initiated. """ if self._command_proc is None: return None self.logger.debug(f"Getting output from proc {self._command_proc.pid}") try: outs, errs = self._command_proc.communicate(timeout=timeout) except subprocess.TimeoutExpired: self.logger.debug(f"Timeout while waiting. Killing process {self._command_proc.pid}") self._command_proc.kill() outs, errs = self._command_proc.communicate() self.logger.trace(f"gphoto2 output: {outs=!r}") if errs != "": self.logger.warning(f"gphoto2 error: {errs!r}") if isinstance(outs, str): outs = outs.split("\n") self._command_proc = None return outs
[docs] def set_property(self, prop: str, val: str | int, is_value: bool = False, is_index: bool = False): """Set a property on the camera. Args: prop (str): The property to set. val (str, int): The value to set the property to. is_value (bool): If True, then the value is a literal value. Default False. is_index (bool): If True, then the value is an index. Default False. Raises: ValueError: If the property is not found. """ self.logger.debug(f"Setting {prop=} to {val=}") if is_index: set_cmd = ["--set-config-index", f"{prop}={val}"] elif is_value: set_cmd = ["--set-config-value", f"{prop}={val}"] else: set_cmd = ["--set-config", f'{prop}="{val}"'] self.command(set_cmd) # Forces the command to wait self.get_command_result()
[docs] def set_properties(self, prop2index: dict[str, int] = None, prop2value: dict[str, str] = None): """Sets a number of properties all at once, by index or value. Args: prop2index (dict or None): A dict with keys corresponding to the property to be set and values corresponding to the index option. prop2value (dict or None): A dict with keys corresponding to the property to be set and values corresponding to the literal value. """ if prop2index: for prop, val in prop2index.items(): try: self.set_property(prop, val, is_index=True) except Exception: self.logger.debug(f"Skipping {prop=} {val=}") if prop2value: for prop, val in prop2value.items(): try: self.set_property(prop, val, is_value=True) except Exception: self.logger.debug(f"Skipping {prop=} {val=}")
[docs] def get_property(self, prop: str) -> str: """Get a configuration property value from the camera. Args: prop (str): The gphoto2 property identifier or label to query. Returns: str: The current value of the requested property (as reported by gphoto2). """ set_cmd = ["--get-config", f"{prop}"] self.command(set_cmd) result = self.get_command_result() output = "" for line in result: match = re.match(r"Current:\s*(.*)", line) if match: output = match.group(1) return output
[docs] def load_properties(self) -> dict: """Load properties from the camera. Reads all the configuration properties available via gphoto2 and returns as dictionary. Returns: dict: A mapping of property labels to their detailed descriptors as parsed from gphoto2 output. """ self.logger.debug("Getting all properties for gphoto2 camera") self.command(["--list-all-config"]) lines = self.get_command_result() properties = {} yaml_string = "" for line in lines: is_id = len(line.split("/")) > 1 is_label = re.match(r"^Label:\s*(.*)", line) is_type = re.match(r"^Type:\s*(.*)", line) is_readonly = re.match(r"^Readonly:\s*(.*)", line) is_current = re.match(r"^Current:\s*(.*)", line) is_choice = re.match(r"^Choice:\s*(\d+)\s*(.*)", line) is_printable = re.match(r"^Printable:\s*(.*)", line) is_help = re.match(r"^Help:\s*(.*)", line) if is_label or is_type or is_current or is_readonly: line = f" {line}" elif is_choice: if int(is_choice.group(1)) == 0: line = f" Choices:\n {int(is_choice.group(1)):d}: {is_choice.group(2)}" else: line = f" {int(is_choice.group(1)):d}: {is_choice.group(2)}" elif is_printable: line = f" {line}" elif is_help: line = f" {line}" elif is_id: line = f"- ID: {line}" elif line == "" or line == "END": continue else: self.logger.debug(f"Line not parsed: {line}") yaml_string += f"{line}\n" self.logger.debug(yaml_string) properties_list = from_yaml(yaml_string) if isinstance(properties_list, list): for prop in properties_list: if prop["Label"]: properties[prop["Label"]] = prop else: properties = properties_list return properties
def _poll_exposure(self, readout_args, exposure_time, timeout=None, interval=0.01): """Check the command output from gphoto2 for polling. This will essentially block until the camera is done exposing, which means the super call should not have to wait. """ # Wait for and clear the _command_proc. try: self.logger.debug(f"Calling get_command_result from base gphoto2 for {self}") # Wait for the exposure to complete, this blocks in gphoto2. outs = self.get_command_result(timeout) self.logger.debug(f"Exposure complete for {self}, getting readout") self._readout(*readout_args) except Exception as err: self.logger.error(f"Error during readout on {self}: {err!r}") self._exposure_error = repr(err) raise err finally: self.logger.debug(f"Camera response for {self}: {outs}") self._is_exposing_event.clear() self.logger.debug(f"Exposing event cleared for {self}") def _readout(self, filename, headers, *args, **kwargs): self.logger.debug(f"Reading Canon DSLR exposure for {filename=}") try: if Path(filename).exists(): self.logger.debug(f"Converting CR2 -> FITS: {filename}") cr2_utils.cr2_to_fits(filename, headers=headers, remove_cr2=False) else: self.logger.warning(f"File {filename!r} not found, cannot process.") except Exception as err: self.logger.error(f"Error processing exposure for {filename} on {self}: {err!r}") finally: self._readout_complete = True def _set_target_temperature(self, target): return None def _set_cooling_enabled(self, enable): return None
[docs] @classmethod def start_tether(cls, port, filename_pattern: str = "%Y%m%dT%H%M%S.%C"): """Start a gphoto2 tethering session for auto-download. Args: port (str): The gphoto2 port identifier, e.g., "usb:001,005". filename_pattern (str): The gphoto2 filename pattern to use when saving files (e.g., "%Y%m%dT%H%M%S.%C"). Defaults to that timestamp-based pattern. Returns: None """ print(f"Starting gphoto2 tether for {port=} using {filename_pattern=}") full_command = [ get_gphoto2_cmd(), "--port", port, "--filename", filename_pattern, "--capture-tethered", ] # Start tether process. process = subprocess.Popen(full_command, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) print(f"gphoto2 tether started on {port=} on {process.pid=}") try: process.wait() except KeyboardInterrupt: print(f"Stopping tether on {port=}")
[docs] @classmethod def gphoto_file_download(cls, port: str, filename_pattern: str, only_new: bool = True): """Download files from the camera using gphoto2. Args: port (str): The gphoto2 port identifier (e.g., "usb:001,005"). filename_pattern (str): Pattern for naming downloaded files. only_new (bool): If True, only fetch files not previously downloaded. Defaults to True. Returns: list[str]: The list of file paths reported as saved by gphoto2. """ print(f"Starting gphoto2 download for {port=} using {filename_pattern=}") command = [ get_gphoto2_cmd(), "--port", port, "--filename", filename_pattern, "--get-all-files", "--recurse", ] if only_new: command.append("--new") completed_proc = subprocess.run(command, capture_output=True) success = completed_proc.returncode >= 0 filenames = list() if success: output = completed_proc.stdout.decode("utf-8").split("\n") for line in output: file_match = file_save_re.match(line) if file_match is not None: fn = file_match.group(1).strip() print(f"Found match {fn}") filenames.append(fn) return filenames