"""Base classes and helpers for DSLR cameras controlled via gphoto2.
Provides AbstractGPhotoCamera, a concrete AbstractCamera subclass that shells out
to the system gphoto2 binary for exposure control and property management. This
module is used by gphoto-based DSLR drivers.
"""
import re
import subprocess
import time
from abc import ABC
from pathlib import Path
from panoptes.utils import error
from panoptes.utils.images import cr2 as cr2_utils
from panoptes.utils.serializers import from_yaml
from panoptes.utils.utils import listify
from panoptes.pocs.camera import AbstractCamera, get_gphoto2_cmd
file_save_re = re.compile(r"Saving file as (.*)")
[docs]
class AbstractGPhotoCamera(AbstractCamera, ABC): # pragma: no cover
"""Abstract camera class that uses gphoto2 interaction.
Args:
config(Dict): Config key/value pairs, defaults to empty dict.
"""
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
# Set up a holder for the exposure process.
self._command_proc = None
self.logger.info(f"GPhoto2 camera {self.name} created on {self.port}")
@property
def temperature(self):
"""Current sensor temperature if available (not typical for DSLRs).
Returns:
None: gphoto2-controlled DSLRs generally do not report sensor temperature.
"""
return None
@property
def target_temperature(self):
"""Target temperature for DSLR sensors (not applicable).
Returns:
None: DSLRs controlled via gphoto2 generally lack regulated cooling.
"""
return None
@property
def cooling_power(self):
"""Cooling power level for the camera, if available.
Returns:
None: DSLRs via gphoto2 typically do not expose cooling power metrics.
"""
return None
@AbstractCamera.uid.getter
def uid(self) -> str:
"""A six-digit serial number for the camera.
Returns:
str: The first six characters of the camera's serial number.
"""
return self._serial_number[0:6]
[docs]
def connect(self):
"""Connect to the DSLR via gphoto2.
Implementations should validate communication to the camera (e.g., by
listing capabilities) and set internal flags required by AbstractCamera.
Raises:
NotImplementedError: This base class does not implement device-specific logic.
"""
raise NotImplementedError
@property
def is_exposing(self):
"""Whether a gphoto2 command (exposure) is still running.
Returns:
bool: True if the current exposure subprocess is active.
"""
if self._command_proc is not None and self._command_proc.poll() is not None:
self._is_exposing_event.clear()
return self._is_exposing_event.is_set()
[docs]
def process_exposure(self, metadata, **kwargs):
"""Convert the CR2 to FITS and pass to common processing.
Args:
metadata (dict): Header metadata saved for the image.
**kwargs: Forwarded to AbstractCamera.process_exposure.
Returns:
None
"""
# Wait for exposure to complete. Timeout handled by exposure thread.
while self.is_exposing:
time.sleep(0.5)
metadata["filepath"] = metadata["filepath"].replace(".cr2", ".fits")
super().process_exposure(metadata, **kwargs)
self._command_proc = None
[docs]
def command(self, cmd: list[str] | str, check_exposing: bool = True):
"""Run a gphoto2 command and start tracking the subprocess.
Args:
cmd (list[str] | str): The gphoto2 arguments to run, e.g.,
["--capture-image-and-download"] or a single string.
check_exposing (bool): If True, prevent starting a new command while an
exposure command is still running. Defaults to True.
Raises:
panoptes.utils.error.InvalidCommand: If a command is already in progress or
parameters are invalid for gphoto2.
panoptes.utils.error.PanError: For unexpected errors starting the subprocess.
Returns:
None
"""
# Test to see if there is a running command already
if self.is_exposing and check_exposing:
raise error.InvalidCommand("Command already running")
else:
# Build the command.
run_cmd = [get_gphoto2_cmd()]
if self.port is not None:
run_cmd.extend(["--port", self.port])
run_cmd.extend(listify(cmd))
self.logger.debug(f"gphoto2 command: {run_cmd!r}")
try:
self._command_proc = subprocess.Popen(
run_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
self.logger.debug(f"Started command on proc={self._command_proc.pid}")
except OSError as e:
raise error.InvalidCommand(f"Can't send command to gphoto2. {e} \t {run_cmd}")
except ValueError as e:
raise error.InvalidCommand(f"Bad parameters to gphoto2. {e} \t {run_cmd}")
except Exception as e:
raise error.PanError(e)
[docs]
def get_command_result(self, timeout: float = 10) -> list[str] | None:
"""Retrieve stdout lines from the last gphoto2 subprocess.
Args:
timeout (float): Seconds to wait for the subprocess to finish before
killing it and collecting output. Defaults to 10.
Returns:
list[str] | None: Lines of stdout from gphoto2, or None if no command
has been initiated.
"""
if self._command_proc is None:
return None
self.logger.debug(f"Getting output from proc {self._command_proc.pid}")
try:
outs, errs = self._command_proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
self.logger.debug(f"Timeout while waiting. Killing process {self._command_proc.pid}")
self._command_proc.kill()
outs, errs = self._command_proc.communicate()
self.logger.trace(f"gphoto2 output: {outs=!r}")
if errs != "":
self.logger.warning(f"gphoto2 error: {errs!r}")
if isinstance(outs, str):
outs = outs.split("\n")
self._command_proc = None
return outs
[docs]
def set_property(self, prop: str, val: str | int, is_value: bool = False, is_index: bool = False):
"""Set a property on the camera.
Args:
prop (str): The property to set.
val (str, int): The value to set the property to.
is_value (bool): If True, then the value is a literal value. Default False.
is_index (bool): If True, then the value is an index. Default False.
Raises:
ValueError: If the property is not found.
"""
self.logger.debug(f"Setting {prop=} to {val=}")
if is_index:
set_cmd = ["--set-config-index", f"{prop}={val}"]
elif is_value:
set_cmd = ["--set-config-value", f"{prop}={val}"]
else:
set_cmd = ["--set-config", f'{prop}="{val}"']
self.command(set_cmd)
# Forces the command to wait
self.get_command_result()
[docs]
def set_properties(self, prop2index: dict[str, int] = None, prop2value: dict[str, str] = None):
"""Sets a number of properties all at once, by index or value.
Args:
prop2index (dict or None): A dict with keys corresponding to the property to
be set and values corresponding to the index option.
prop2value (dict or None): A dict with keys corresponding to the property to
be set and values corresponding to the literal value.
"""
if prop2index:
for prop, val in prop2index.items():
try:
self.set_property(prop, val, is_index=True)
except Exception:
self.logger.debug(f"Skipping {prop=} {val=}")
if prop2value:
for prop, val in prop2value.items():
try:
self.set_property(prop, val, is_value=True)
except Exception:
self.logger.debug(f"Skipping {prop=} {val=}")
[docs]
def get_property(self, prop: str) -> str:
"""Get a configuration property value from the camera.
Args:
prop (str): The gphoto2 property identifier or label to query.
Returns:
str: The current value of the requested property (as reported by gphoto2).
"""
set_cmd = ["--get-config", f"{prop}"]
self.command(set_cmd)
result = self.get_command_result()
output = ""
for line in result:
match = re.match(r"Current:\s*(.*)", line)
if match:
output = match.group(1)
return output
[docs]
def load_properties(self) -> dict:
"""Load properties from the camera.
Reads all the configuration properties available via gphoto2 and returns
as dictionary.
Returns:
dict: A mapping of property labels to their detailed descriptors as
parsed from gphoto2 output.
"""
self.logger.debug("Getting all properties for gphoto2 camera")
self.command(["--list-all-config"])
lines = self.get_command_result()
properties = {}
yaml_string = ""
for line in lines:
is_id = len(line.split("/")) > 1
is_label = re.match(r"^Label:\s*(.*)", line)
is_type = re.match(r"^Type:\s*(.*)", line)
is_readonly = re.match(r"^Readonly:\s*(.*)", line)
is_current = re.match(r"^Current:\s*(.*)", line)
is_choice = re.match(r"^Choice:\s*(\d+)\s*(.*)", line)
is_printable = re.match(r"^Printable:\s*(.*)", line)
is_help = re.match(r"^Help:\s*(.*)", line)
if is_label or is_type or is_current or is_readonly:
line = f" {line}"
elif is_choice:
if int(is_choice.group(1)) == 0:
line = f" Choices:\n {int(is_choice.group(1)):d}: {is_choice.group(2)}"
else:
line = f" {int(is_choice.group(1)):d}: {is_choice.group(2)}"
elif is_printable:
line = f" {line}"
elif is_help:
line = f" {line}"
elif is_id:
line = f"- ID: {line}"
elif line == "" or line == "END":
continue
else:
self.logger.debug(f"Line not parsed: {line}")
yaml_string += f"{line}\n"
self.logger.debug(yaml_string)
properties_list = from_yaml(yaml_string)
if isinstance(properties_list, list):
for prop in properties_list:
if prop["Label"]:
properties[prop["Label"]] = prop
else:
properties = properties_list
return properties
def _poll_exposure(self, readout_args, exposure_time, timeout=None, interval=0.01):
"""Check the command output from gphoto2 for polling.
This will essentially block until the camera is done exposing, which means
the super call should not have to wait.
"""
# Wait for and clear the _command_proc.
try:
self.logger.debug(f"Calling get_command_result from base gphoto2 for {self}")
# Wait for the exposure to complete, this blocks in gphoto2.
outs = self.get_command_result(timeout)
self.logger.debug(f"Exposure complete for {self}, getting readout")
self._readout(*readout_args)
except Exception as err:
self.logger.error(f"Error during readout on {self}: {err!r}")
self._exposure_error = repr(err)
raise err
finally:
self.logger.debug(f"Camera response for {self}: {outs}")
self._is_exposing_event.clear()
self.logger.debug(f"Exposing event cleared for {self}")
def _readout(self, filename, headers, *args, **kwargs):
self.logger.debug(f"Reading Canon DSLR exposure for {filename=}")
try:
if Path(filename).exists():
self.logger.debug(f"Converting CR2 -> FITS: {filename}")
cr2_utils.cr2_to_fits(filename, headers=headers, remove_cr2=False)
else:
self.logger.warning(f"File {filename!r} not found, cannot process.")
except Exception as err:
self.logger.error(f"Error processing exposure for {filename} on {self}: {err!r}")
finally:
self._readout_complete = True
def _set_target_temperature(self, target):
return None
def _set_cooling_enabled(self, enable):
return None
[docs]
@classmethod
def start_tether(cls, port, filename_pattern: str = "%Y%m%dT%H%M%S.%C"):
"""Start a gphoto2 tethering session for auto-download.
Args:
port (str): The gphoto2 port identifier, e.g., "usb:001,005".
filename_pattern (str): The gphoto2 filename pattern to use when saving files
(e.g., "%Y%m%dT%H%M%S.%C"). Defaults to that timestamp-based pattern.
Returns:
None
"""
print(f"Starting gphoto2 tether for {port=} using {filename_pattern=}")
full_command = [
get_gphoto2_cmd(),
"--port",
port,
"--filename",
filename_pattern,
"--capture-tethered",
]
# Start tether process.
process = subprocess.Popen(full_command, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
print(f"gphoto2 tether started on {port=} on {process.pid=}")
try:
process.wait()
except KeyboardInterrupt:
print(f"Stopping tether on {port=}")
[docs]
@classmethod
def gphoto_file_download(cls, port: str, filename_pattern: str, only_new: bool = True):
"""Download files from the camera using gphoto2.
Args:
port (str): The gphoto2 port identifier (e.g., "usb:001,005").
filename_pattern (str): Pattern for naming downloaded files.
only_new (bool): If True, only fetch files not previously downloaded. Defaults to True.
Returns:
list[str]: The list of file paths reported as saved by gphoto2.
"""
print(f"Starting gphoto2 download for {port=} using {filename_pattern=}")
command = [
get_gphoto2_cmd(),
"--port",
port,
"--filename",
filename_pattern,
"--get-all-files",
"--recurse",
]
if only_new:
command.append("--new")
completed_proc = subprocess.run(command, capture_output=True)
success = completed_proc.returncode >= 0
filenames = list()
if success:
output = completed_proc.stdout.decode("utf-8").split("\n")
for line in output:
file_match = file_save_re.match(line)
if file_match is not None:
fn = file_match.group(1).strip()
print(f"Found match {fn}")
filenames.append(fn)
return filenames