Source code for panoptes.pocs.mount.ioptron.base

import re
from contextlib import suppress

from astropy import units as u
from astropy.coordinates import SkyCoord, Latitude, Longitude
from astropy.coordinates.earth import EarthLocation
from astropy.time import Time
from panoptes.utils import error as error
from panoptes.utils.time import current_time

from panoptes.pocs.mount.ioptron import MountGPS, MountState, MountTrackingState, MountMovementSpeed, MountTimeSource, \
    MountHemisphere
from panoptes.pocs.mount.serial import AbstractSerialMount


[docs] class Mount(AbstractSerialMount): """Mount class for iOptron mounts.""" def __init__(self, location, mount_version=None, *args, **kwargs): self._mount_version = mount_version or self._mount_version super(Mount, self).__init__(location, *args, **kwargs) self._raw_status = None self._latitude_format = self.commands['latitude_format'] self._longitude_format = self.commands['longitude_format'] self._ra_format = self.commands['ra_format'] self._dec_format = self.commands['dec_format'] self._location_units = self.commands['location_units'] self._ra_coords_units = self.commands['ra_coords_units'] self._dec_coords_units = self.commands['dec_coords_units'] self._status_format = re.compile(self.commands.get('status_format', '*'), flags=re.VERBOSE) self._coords_format = re.compile(self.commands.get('coords_format', '*'), flags=re.VERBOSE) self._state = MountState.UNKNOWN @property def is_home(self): """ Mount home status. """ self.update_status() return self._is_home
[docs] def initialize(self, set_rates=True, unpark=False, *arg, **kwargs): """ Initialize the connection with the mount and setup for location. iOptron mounts are initialized by sending the following two commands to the mount: * MountInfo If the mount is successfully initialized, the `_setup_location_for_mount` method is also called. Returns: bool: Returns the value from `self.is_initialized`. """ if not self.is_connected: self.logger.info(f'Connecting to mount {__name__}') self.connect() if self.is_connected and not self.is_initialized: self.logger.info(f'Initializing {__name__} mount') # We trick the mount into thinking it's initialized while we # initialize otherwise the `query` method will test # to see if initialized and be put into loop. self._is_initialized = True # See if we are using old command set. command_set = self.get_config('commands_file') if command_set == 'ioptron/v140': actual_version_info = self.query('version') expected_version_info = self.commands.get('version').get('response') if actual_version_info != expected_version_info: raise error.MountNotFound('Problem initializing mount - version numbers do not match') actual_mount_info = self.query('mount_info') expected_mount_info = self.commands.get('mount_info').get('response') self._is_initialized = False # Test our init procedure for iOptron if actual_mount_info != expected_mount_info: self.logger.debug(f'{actual_mount_info} != {expected_mount_info}') raise error.MountNotFound('Problem initializing mount') else: self._is_initialized = True self._setup_location_for_mount() if set_rates: self._set_initial_rates() self.logger.info(f'Mount initialized: {self.is_initialized}') return self.is_initialized
[docs] def park(self, ra_direction=None, ra_seconds=None, dec_direction=None, dec_seconds=None, *args, **kwargs ): """Slews to the park position and parks the mount. This still uses a custom park command because the mount will not allow the Declination axis to move below 0 degrees. Note: When mount is parked no movement commands will be accepted. Args: ra_direction (str or None): The direction to move the RA axis. If not provided (the default), then look at config setting, otherwise 'west'. ra_seconds (str or None): The number of seconds to move the RA axis at maximum move speed. If not provided (the default), then look at config setting, otherwise 15 seconds. dec_direction (str or None): The direction to move the Declination axis. If not provided (the default), then look at config setting, otherwise 'north'. dec_seconds (str or None): The number of seconds to move the Declination axis at maximum move speed. If not provided (the default), then look at config setting, otherwise 15 seconds. Returns: bool: indicating success """ if self.is_parked: self.logger.success("Mount is already parked") return self.is_parked # Get the direction and timing ra_direction = ra_direction or self.get_config('mount.settings.park.ra_direction', 'west') ra_seconds = ra_seconds or self.get_config('mount.settings.park.ra_seconds', 15) dec_direction = dec_direction or self.get_config('mount.settings.park.dec_direction', 'north') dec_seconds = dec_seconds or self.get_config('mount.settings.park.dec_seconds', 15) self.unpark() self.query('set_button_moving_rate', 9) self.logger.debug(f'Moving mount to home before parking.') if self.slew_to_home(blocking=True): self.logger.debug( f'Parking mount: RA: {ra_direction} {ra_seconds} seconds, ' f'Dec: {dec_direction} {dec_seconds} seconds' ) self.move_direction(direction=dec_direction, seconds=dec_seconds) self.move_direction(direction=ra_direction, seconds=ra_seconds) self._is_parked = True self.logger.success('Mount successfully parked.') return self.is_parked
def _setup_location_for_mount(self): """ Sets the mount up to the current location. Mount must be initialized first. This uses mount.location (an astropy.coords.EarthLocation) to set most of the params and the rest is read from a config file. Users should not call this directly. Includes: * Latitude set_long * Longitude set_lat * Daylight Savings disable_daylight_savings * Universal Time Offset set_gmt_offset * Current Date set_local_date * Current Time set_local_time """ if not isinstance(self.location, EarthLocation): self.logger.warning('Please set a location before attempting setup') if not self.is_initialized: self.logger.warning('Mount has not been initialized') return self.logger.info('Setting up mount for location') # Location # Adjust the lat/long for format expected by iOptron. coords_unit = getattr(u, self._location_units) lat = self._latitude_format.format(self.location.lat.to(coords_unit).value) lon = self._longitude_format.format(self.location.lon.to(coords_unit).value) self.query('set_long', lon) self.query('set_lat', lat) # Daylight savings and GMT offset. self.query('disable_daylight_savings') gmt_offset = self.get_config('location.gmt_offset', default=0) self.logger.debug(f'Setting GMT offset to {gmt_offset:+04.0f}') self.query('set_gmt_offset', f'{gmt_offset:+04.0f}') # Set the date and time. # Newer firmware has the `set_utc_time` method which sets both the date and time. # Older firmware has `set_local_date` and `set_local_time` which must be called separately. now = current_time() if 'set_utc_time' in self.commands: j2000 = Time(2000, format='jyear') offset_time = (now - j2000).to(u.ms).value self.logger.debug(f'Setting UTC time to {offset_time:0>13.0f}') self.query('set_utc_time', f'{offset_time:0>13.0f}') else: now = now + gmt_offset * u.minute self.query('set_local_time', now.datetime.strftime("%H%M%S")) self.query('set_local_date', now.datetime.strftime("%y%m%d")) def _set_initial_rates(self, alt_limit='+30', meridian_treatment='015'): # Make sure we start at sidereal. self.query('set_sidereal_tracking') self.logger.debug(f'Setting altitude limit to {alt_limit}') self.query('set_altitude_limit', alt_limit) self.logger.debug(f'Setting {meridian_treatment=}') self.query('set_meridian_treatment', meridian_treatment) self.logger.debug('Setting manual moving rate to max') self.query('set_button_moving_rate', 9) self.logger.debug(f"Mount guide rate: {self.query('get_guide_rate')}") def _set_zero_position(self): """ Sets the current position as the zero position. The iOptron allows you to set the current position directly, so we simply call the iOptron command. """ self.logger.info("Setting zero position") return self.query('set_zero_position') def _mount_coord_to_skycoord(self, mount_coords): """ Converts between iOptron RA/Dec format and a SkyCoord Args: mount_coords (str): Coordinates as returned by mount Returns: astropy.SkyCoord: Mount coordinates as astropy SkyCoord with EarthLocation included. """ self.logger.debug(f'Mount coordinates: {mount_coords}') coords_match = self._coords_format.fullmatch(mount_coords) self.logger.debug(f'Mount coordinates match: {coords_match}') coords = None if coords_match is not None: ra_coords_units = getattr(u, self._ra_coords_units) dec_coords_units = getattr(u, self._dec_coords_units) # Turn mount output into appropriate units. ra = (int(coords_match.group('ra')) * ra_coords_units) dec = (int(coords_match.group('dec')) * dec_coords_units) # Old firmware had RA in a time unit. if self._ra_coords_units == 'millisecond': self.logger.debug(f'Converting RA from {self._ra_coords_units} to degrees') ra = (ra.to(u.hour).value * u.hourangle) # Convert to degrees. ra = ra.to(u.deg) dec = dec.to(u.deg) # Add the sign back in. if coords_match.group('dec_sign') == '-': dec = dec * -1 self.logger.debug(f'Creating SkyCoord for {ra=} {dec=}') coords = SkyCoord(ra=ra, dec=dec, frame='icrs', unit=(u.deg, u.deg)) self.logger.debug(f'Created SkyCoord: {coords=}') else: self.logger.warning('Cannot create SkyCoord from mount coordinates') return coords def _skycoord_to_mount_coord(self, coords): """ Converts between SkyCoord and a iOptron RA/Dec format. """ # Do some special handling of older firmware that had RA coords in a time unit. if self._ra_coords_units == 'millisecond': self.logger.debug(f'Converting RA from degrees to {self._ra_coords_units}') ra_coord = (coords.ra.to(u.hourangle).value * u.hour).to(self._ra_coords_units).value else: ra_coord = coords.ra.to(self._ra_coords_units).value dec_coord = coords.dec.to(self._dec_coords_units).value # Convert to a string for the mount. ra_mount = self._ra_format.format(ra_coord) dec_mount = self._dec_format.format(dec_coord) self.logger.debug(f'RA: {ra_coord} <-> {ra_mount=}') self.logger.debug(f'Dec: {dec_coord} <-> {dec_mount=}') return ra_mount, dec_mount def _update_status(self): self._raw_status = self.query('get_status') status = dict() status_match = self._status_format.fullmatch(self._raw_status) if status_match: status_dict = status_match.groupdict() self._state = MountState(int(status_dict['state'])) self._at_mount_park = self.state == MountState.PARKED self._is_home = self.state == MountState.AT_HOME self._is_tracking = self.state == MountState.TRACKING or self.state == MountState.TRACKING_PEC self._is_slewing = self.state == MountState.SLEWING status['state'] = self.state.name status['is_parked'] = self.is_parked status['at_mount_park'] = self._at_mount_park coords_unit = getattr(u, self._location_units) status['longitude'] = Longitude((float(status_dict['longitude']) * coords_unit).to(u.degree)) # Longitude adds +90° to avoid negative numbers, so subtract for original. status['latitude'] = Latitude((float(status_dict['latitude']) * coords_unit).to(u.degree) - (90 * u.degree)) status['gps'] = MountGPS(int(status_dict['gps'])).name status['tracking'] = MountTrackingState(int(status_dict['tracking'])).name self._movement_speed = MountMovementSpeed(int(status_dict['movement_speed'])) status['movement_speed'] = self._movement_speed.name status['time_source'] = MountTimeSource(int(status_dict['time_source'])).name status['hemisphere'] = MountHemisphere(int(status_dict['hemisphere'])).name status['tracking_rate_ra'] = self.tracking_rate ts = self.query('get_timestamp') offset = int(ts[:4]) * u.minute daylight_savings = bool(int(ts[4])) status['timestamp'] = ts status['time_offset'] = offset status['time_daylight_savings'] = daylight_savings if self.commands.get('command_version', 0) == 2.5: year = int(ts[5:7]) month = int(ts[7:9]) day = int(ts[9:11]) hour = int(ts[11:13]) minute = int(ts[13:15]) second = int(ts[15:17]) status['time_local'] = Time(f'20{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}').iso elif self.commands.get('command_version', 0) >= 3.10: with suppress(Exception): now = int(ts[5:]) * u.ms j2000 = Time(2000, format='jyear') status['time_utc'] = (j2000 + now).iso status['time_local'] = (j2000 + now + offset).iso return status def _setup_commands(self, commands): super()._setup_commands(commands) # Update the `MountInfo` response if one has been set on the class. with suppress(AttributeError, KeyError): self.commands['mount_info']['response'] = self._mount_version