Source code for panoptes.pocs.camera

from collections import OrderedDict
import re
import shutil
import subprocess
import random
from contextlib import suppress
from typing import Optional

import requests
from pydantic import AnyHttpUrl

from panoptes.pocs.camera.camera import AbstractCamera  # noqa

from panoptes.pocs.utils.logger import get_logger
from panoptes.utils import error
from panoptes.utils.config.client import get_config
from panoptes.utils.library import load_module

logger = get_logger()


[docs] def get_gphoto2_cmd(): """Finds the gphoto2 command on the system""" return shutil.which('gphoto2') or shutil.which('gphoto2', path='/usr/local/bin')
[docs] def list_connected_cameras(endpoint: Optional[AnyHttpUrl] = None): """Detect connected cameras. Uses gphoto2 to try and detect which cameras are connected. Cameras should be known and placed in config but this is a useful utility. Returns: list: A list of the ports with detected cameras. """ result = '' if endpoint is not None: response = requests.post(endpoint, json=dict(arguments='--auto-detect')) if response.ok: result = response.json()['output'] else: gphoto2 = get_gphoto2_cmd() if not gphoto2: # pragma: no cover raise error.NotFound('gphoto2 is missing, please install or use the endpoint option.') command = [gphoto2, '--auto-detect'] result = subprocess.check_output(command).decode('utf-8') lines = result.split('\n') ports = [] for line in lines: camera_match = re.match(r'([\w\d\s_.]{30})\s(usb:\d{3},\d{3})', line) if camera_match: # camera_name = camera_match.group(1).strip() port = camera_match.group(2).strip() ports.append(port) return ports
[docs] def create_cameras_from_config(config=None, cameras=None, auto_primary=True, recreate_existing=False, *args, **kwargs): """Create camera object(s) based on the config. Creates a camera for each camera item listed in the config. Ensures the appropriate camera module is loaded. Args: config (dict or None): A config object for a camera or None to lookup in config-server. cameras (list of panoptes.pocs.camera.Camera or None): A list of camera objects or None. auto_primary (bool): If True, when no camera is marked as the primary camera, the first camera in the list will be used as primary. Default True. recreate_existing (bool): If True, a camera object will be recreated if an existing camera with the same `uid` is already assigned. Should currently only affect cameras that use the `sdk` (i.g. not DSLRs). Default False raises an exception if camera is already assigned. *args (list): Passed to `get_config`. **kwargs (dict): Can pass a `cameras` object that overrides the info in the configuration file. Can also pass `auto_detect`(bool) to try and automatically discover the ports. Any other items as passed to `get_config`. Returns: OrderedDict: An ordered dictionary of created camera objects, with the camera name as key and camera instance as value. Returns an empty OrderedDict if there is no camera configuration items. Raises: error.CameraNotFound: Raised if camera cannot be found at specified port or if auto_detect=True and no cameras are found. error.PanError: Description """ camera_config = config or get_config('cameras', *args, **kwargs) if not camera_config: # cameras section either missing or empty logger.info('No camera information in config.') return None logger.debug(f"camera_config={camera_config!r}") camera_defaults = camera_config.get('defaults', dict()) cameras = cameras or OrderedDict() ports = list() auto_detect = camera_defaults.get('auto_detect', False) endpoint = camera_defaults.get('endpoint', None) # Lookup the connected ports if auto_detect: logger.debug("Auto-detecting ports for cameras") try: ports = list_connected_cameras(endpoint=endpoint) except error.PanError as e: logger.warning(e) if len(ports) == 0: raise error.CameraNotFound(msg="No cameras detected. For testing, use simulator.") else: logger.debug(f"Detected ports={ports!r}") primary_camera = None device_info = camera_config['devices'] for cam_num, cfg in enumerate(device_info): # Get a copy of the camera defaults and update with device config. device_config = camera_defaults.copy() device_config.update(cfg) cam_name = device_config.setdefault('name', f'Cam{cam_num:02d}') # Check for proper connection method. model = device_config['model'] # Assign an auto-detected port. If none are left, skip if auto_detect and 'gphoto' in model: try: device_config['port'] = ports.pop() except IndexError: logger.warning(f"No ports left for {cam_name}, skipping.") continue elif model == 'simulator': device_config['port'] = f'usb:999,{random.randint(0, 1000):03d}' logger.debug(f'Creating camera: {model}') try: module = load_module(model) logger.debug(f'Camera module: module={module!r}') if recreate_existing: with suppress(AttributeError): module._assigned_cameras = set() # We either got a class or a module. if callable(module): camera_obj = module(**device_config) else: if hasattr(module, 'Camera'): camera_obj = module.Camera(**device_config) else: raise error.NotFound(f'module={module!r} does not have a Camera object') except error.NotFound: logger.error(f'Cannot find camera module with config: {device_config}') except Exception as e: logger.error(f'Cannot create camera type: {model} {e!r}') else: # Check if the config specified a primary camera and if it matches. if camera_obj.uid == camera_config.get('primary'): camera_obj.is_primary = True primary_camera = camera_obj logger.debug(f"Camera created: camera={camera_obj!r}") cameras[cam_name] = camera_obj if len(cameras) == 0: raise error.CameraNotFound(msg="No cameras available") # If no camera was specified as primary use the first if primary_camera is None and auto_primary: logger.info(f'No primary camera given, assigning the first camera') primary_camera = list(cameras.values())[0] # First camera primary_camera.is_primary = True logger.info(f"Primary camera: {primary_camera}") logger.success(f"{len(cameras)} cameras created") return cameras