Source code for panoptes.pocs.dome.astrohaven

"""Astrohaven clamshell dome driver using a simple RS-232 protocol.

Implements control for Astrohaven domes driven by a Vision 130 PLC. Provides
open/close helpers and simple status polling based on one-character protocol
codes emitted by the controller.
"""
# Based loosely on the code written by folks at Wheaton College, including:
# https://github.com/goodmanj/domecontrol

import time

from panoptes.pocs.dome import abstract_serial_dome


[docs] class Protocol: """ASCII protocol constants emitted/accepted by the Astrohaven PLC. Encodes status bytes for slit positions and single-character commands used to open/close each shutter and query limits. """ # Status codes, produced when not responding to an input. They are oriented towards # reporting whether the two shutters are fully closed. BOTH_CLOSED = "0" # Both A and B shutters are fully closed. A_IS_CLOSED = "1" # Only shutter A is fully closed. B_IS_CLOSED = "2" # Only shutter B is fully closed. BOTH_OPEN = "3" # Really means both NOT fully closed. # Status codes produced by the dome when not responding to a movement command. STABLE_STATES = (BOTH_CLOSED, BOTH_OPEN, B_IS_CLOSED, A_IS_CLOSED) # Limit responses, when the limit has been reached on a direction of movement. A_OPEN_LIMIT = "x" # Response to asking for A to open, and being at open limit A_CLOSE_LIMIT = "X" # Response to asking for A to close, and being at close limit B_OPEN_LIMIT = "y" # Response to asking for B to open, and being at open limit B_CLOSE_LIMIT = "Y" # Response to asking for B to close, and being at close limit # Command codes, echoed while happening CLOSE_A = "A" OPEN_A = "a" CLOSE_B = "B" OPEN_B = "b" # These codes are documented for an 18' dome, but appear not to work with the 7' domes # we have access to. OPEN_BOTH = "O" CLOSE_BOTH = "C" RESET = "R"
[docs] class AstrohavenDome(abstract_serial_dome.AbstractSerialDome): """Interface to an Astrohaven clamshell dome with a Vision 130 PLC and RS-232 interface. Experience shows that it emits a status byte about once a second, with the codes as described in the Protocol class. """ # TODO(jamessynge): Get these from the config file (i.e. per instance), with these values # as defaults, though LISTEN_TIMEOUT can just be the timeout config for SerialData. LISTEN_TIMEOUT = 3 # Max number of seconds to wait for a response. MOVE_TIMEOUT = 10 # Max number of seconds to run the door motors. MOVE_LISTEN_TIMEOUT = 0.1 # When moving, how long to wait for feedback. NUM_CLOSE_FEEDBACKS = 2 # Number of target_feedback bytes needed. def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # TODO(jamessynge): Consider whether to expose settings of the pyserial object thru # rs232.SerialData. Probably should. Could use newer dictionary get/set mechanism so # that change to SerialData is minimal. Alternately, provide a means of reading # that info from the config file in AbstractSerialDome.__init__ and using it to # initialize the SerialData instance. # Let's use a timeout that is long enough so that we are "guaranteed" a byte of output # from the device. 1 second seems too small given that it appears that is the pace of # output from the PLC. # TODO(jamessynge): Remove this, replace with a value in the config file. self.serial.ser.timeout = AstrohavenDome.LISTEN_TIMEOUT @property def is_open(self): """Whether both clamshell shutters are reported open by the controller.""" v = self._read_latest_state() return v == Protocol.BOTH_OPEN
[docs] def open(self): """Open both clamshell shutters to their limits. Returns: bool: True if the stable state after motion reports BOTH_OPEN. """ self._full_move(Protocol.OPEN_A, Protocol.A_OPEN_LIMIT) self._full_move(Protocol.OPEN_B, Protocol.B_OPEN_LIMIT) v = self._read_state_until_stable() if v == Protocol.BOTH_OPEN: return True self.logger.warning(f"AstrohavenDome.open wrong final state: {v!r}") return False
@property def is_closed(self): """Whether both clamshell shutters are reported fully closed.""" v = self._read_latest_state() return v == Protocol.BOTH_CLOSED
[docs] def close(self): """Close both clamshell shutters to their fully closed positions. Returns: bool: True if the stable state after motion reports BOTH_CLOSED. """ self._full_move( Protocol.CLOSE_A, Protocol.A_CLOSE_LIMIT, feedback_countdown=AstrohavenDome.NUM_CLOSE_FEEDBACKS, ) self._full_move( Protocol.CLOSE_B, Protocol.B_CLOSE_LIMIT, feedback_countdown=AstrohavenDome.NUM_CLOSE_FEEDBACKS, ) v = self._read_state_until_stable() if v == Protocol.BOTH_CLOSED: return True self.logger.warning(f"AstrohavenDome.close wrong final state: {v!r}") return False
@property def status(self): """Return a dict with dome's current status.""" status_lookup = { Protocol.BOTH_CLOSED: "closed_both", Protocol.A_IS_CLOSED: "closed_a", Protocol.B_IS_CLOSED: "closed_b", Protocol.BOTH_OPEN: "open_both", } state = self._read_latest_state() return_status = dict( connected=self.is_connected, ) try: return_status["open"] = status_lookup[state] except KeyError: return_status["open"] = f"Unexpected response from Astrohaven Dome Controller: {state!r}" return return_status def __str__(self): if self.is_connected: return self.status return "Disconnected" # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def _read_latest_state(self): """Read and return the latest output from the Astrohaven dome controller.""" # TODO(jamessynge): Add the ability to do a non-blocking read of the available input # from self.serial. If there is some input, return it, but don't wait for more. The last # received byte is good enough for our purposes... as long as we drained the input buffer # before sending a command to the dome. self.serial.reset_input_buffer() data = self.serial.read_bytes(size=1) if len(data): return chr(data[-1]) return None def _read_state_until_stable(self): """Read the status until it reaches one of the stable values.""" end_by = time.time() + AstrohavenDome.LISTEN_TIMEOUT c = "" while True: data = self.serial.read_bytes(size=1) if data: c = chr(data[-1]) if c in Protocol.STABLE_STATES: return c self.logger.debug(f"_read_state_until_stable not yet stable: data={data!r}") if time.time() < end_by: continue pass return c def _full_move(self, send, target_feedback, feedback_countdown=1): """Send a command code until the target_feedback is recieved, or a timeout is reached. Args: send: The command code to send; this is a string of one ASCII character. See Protocol above for the command codes. target_feedback: The response code to compare to the response from the dome; this is a string of one ASCII character. See Protocol above for the codes; while the dome is moving, it echoes the command code sent. Returns: True if the target_feedback is received from the dome before the MOVE_TIMEOUT; False otherwise. """ # Set a short timeout on reading, so that we don't open or close slowly. # In other words, we'll try to read status, but if it isn't available, # we'll just send another command. saved_timeout = self.serial.ser.timeout self.serial.ser.timeout = AstrohavenDome.MOVE_LISTEN_TIMEOUT try: have_seen_send = False end_by = time.time() + AstrohavenDome.MOVE_TIMEOUT self.serial.reset_input_buffer() # Note that there is no wait in this loop because we have a timeout on reading from # the the dome controller, and we know that the dome doesn't echo every character that # we send to it. while True: self.serial.write(send) data = self.serial.read_bytes(size=1) if data: c = chr(data[-1]) if c == target_feedback: feedback_countdown -= 1 self.logger.debug(f"Got target_feedback, feedback_countdown={feedback_countdown!r}") if feedback_countdown <= 0: # Woot! Moved the dome and got the desired response. return True elif c == send: have_seen_send = True elif not have_seen_send and c in Protocol.STABLE_STATES: # pragma: no cover # At the start of looping, we may see the previous stable state until # we start seeing the echo of `send`. pass else: # pragma: no cover self.logger.warning( f"Unexpected value from dome! send={send!r} " f"target_feedback={target_feedback!r} data={data!r}" ) if time.time() < end_by: continue self.logger.error( f"Timed out moving the dome. Check for hardware or communications problem. " f"send={send!r} target_feedback={target_feedback!r} data={data!r}" ) return False finally: self.serial.ser.timeout = saved_timeout
# Expose as Dome so that we can generically load by module name, without knowing the specific type # of dome. But for testing, it make sense to *know* that we're dealing with the correct class. Dome = AstrohavenDome