Source code for panoptes.pocs.sensor.power

"""Power board sensor interface for relays and current/voltage readings.

Provides PowerBoard, a serial-connected reader/controller for the PANOPTES
power relay board (Arduino + relay shield), and a small Relay data class for
labels/states. Includes helpers to toggle relays and to parse/aggregate
telemetry into rolling means and status dictionaries.
"""

import time
from collections.abc import Callable
from contextlib import suppress
from dataclasses import dataclass
from enum import IntEnum
from functools import partial

import pandas as pd
from astropy import units as u

from panoptes.utils import error
from panoptes.utils.serial.device import SerialDevice, find_serial_port
from panoptes.utils.serializers import from_json, to_json
from panoptes.utils.time import current_time

from panoptes.pocs.base import PanBase


[docs] class PinState(IntEnum): """Represents a HIGH or LOW state for a digital output pin.""" OFF = 0 LOW = 0 ON = 1 HIGH = 1
POWER_SYMBOLS = {PinState.OFF: "⭘", PinState.ON: "⏽"}
[docs] class TruckerBoardCommands(IntEnum): """The Trucker Board can accept a series of commands for controlling the relays""" OFF = 0 ON = 1 TOGGLE = 2 # Toggle relay. CYCLE_DELAY = 3 # Cycle the current state with a 30 second delay.
[docs] class TruckerRelayIndex(IntEnum): """The available relays on the Trucker Board""" RELAY_0 = 0 RELAY_1 = 1 RELAY_2 = 2 RELAY_3 = 3 RELAY_4 = 4
[docs] @dataclass class Relay: """Relay data class""" name: str relay_index: TruckerRelayIndex label: str | None state: PinState | None = PinState.OFF default_state: PinState | None = PinState.OFF
[docs] def turn_on(self): """Turn this relay ON (helper bound by PowerBoard.setup_relays).""" pass
[docs] def turn_off(self): """Turn this relay OFF (helper bound by PowerBoard.setup_relays).""" pass
[docs] def toggle_relay(self): """Toggle this relay from OFF to ON or vice versa.""" pass
[docs] def cycle_relay(self): """Cycle this relay (toggle with a small delay), as supported by firmware.""" pass
def __str__(self): return f"[{self.name}] {self.label} {self.state.name}"
[docs] class PowerBoard(PanBase): """Power distribution and monitoring. This represents a "trucker" board for PANOPTES, which is a combination of an Arduino Uno and an Infineon 24V relay shield. The relay shield has three PROFETs on them that are capable of sensing the current through the relay. RELAY_4 has a dedicated PROFET. The other two PROFETs can switch between two relays depending on the status of the appropriate DSEL pin. This class uses `panoptes.utils.device.serial.SerialDevice` for threaded (async) reading of the values from the Arduino Uno connected to the power board, which are parsed by the default callback and appended to a `deque` whose size can be controlled. A custom callback can be passed that should accept a single string parameter and return a dictionary. Pin names specified above correspond to Infineon terminology. See manual: https://bit.ly/2IGgWLQ. """ def __init__( self, port: str = None, name: str = "Power Board", relays: dict[str, dict] = None, reader_callback: Callable[[dict], dict] = None, mean_interval: int | None = 5, arduino_board_name: str = "power_board", *args, **kwargs, ): """Initialize the power board. The `relays` should be a dictionary with the relay name as key and a dict with `label` and `initial_state` entries. Example: RELAY_0: label: mount default_state: on Args: port (str, optional): The dev port for the arduino, if not provided, search for port matching the vendor (2341) and product id (0043). name (str): The user-friendly name for the power board. relays (dict[Relay] or None): The relay configuration. See notes for details. reader_callback (Callable): A callback for the serial readings. The default callback will update the pin values and record data in a json format, which is then made into a dataframe with the `to_dataframe`. mean_interval (int): When taking a rolling mean, use this many seconds, default 5. arduino_board_name (str): The name of the arduino board to match in the callback and the collection name for storing in `record. """ super().__init__(*args, **kwargs) if port is None: port = PowerBoard.lookup_port(**kwargs) if port is None: raise error.NotFound("Failed to automatically find port for PowerBoard.") self.logger.info(f"Guessing that arduino is on {port=}") self.port = port self.name = name self.arduino_board_name = "power_board" reader_callback = reader_callback or self.default_reader_callback self.logger.debug(f"Setting up Power board connection for {name=} on {self.port}") self._ignore_readings = 5 self.arduino_board = SerialDevice( port=self.port, serial_settings=dict(baudrate=9600), reader_callback=reader_callback, name=arduino_board_name, ) self.relays: list[Relay] = list() self.relay_labels: dict[str, Relay] = dict() self.setup_relays(relays) time.sleep(2) # Set initial relay states. for relay in self.relays: self.logger.info(f"Setting {relay.label} to {relay.default_state.name}") self.change_relay_state(relay, TruckerBoardCommands(relay.default_state)) self._mean_interval = mean_interval self.logger.info("Power board initialized") @property def status(self): """Summarize relay states and latest readings. Returns: dict: Mapping of relay name to a dict with label, state, and reading, plus top-level 'ac_ok' and 'battery_low' booleans if available. """ readings = self.readings if not readings: self.logger.warning("No readings available. If system just started please wait a moment.") return {} status = { r.name: dict(label=r.label, state=r.state.name, reading=readings[r.label]) for r in self.relays } status["ac_ok"] = readings["ac_ok"] status["battery_low"] = readings["battery_low"] return status @property def readings(self): """Return the rolling mean of the readings.""" time_start = (current_time() - self._mean_interval * u.second).to_datetime() df = self.to_dataframe()[time_start:] if len(df) == 0: return {} values = df.mean().astype("int").to_dict() # Add the most recent ac_ok and battery_low check. try: values["ac_ok"] = bool(df.iloc[-1]["ac_ok"]) values["battery_low"] = bool(df.iloc[-1]["battery_low"]) except (KeyError, IndexError): values["ac_ok"] = None values["battery_low"] = None return values
[docs] def turn_on(self, label): """Turns on the relay with the given label.""" self.change_relay_state(self.relay_labels[label], TruckerBoardCommands.ON)
[docs] def turn_off(self, label): """Turns off the relay with the given label.""" self.change_relay_state(self.relay_labels[label], TruckerBoardCommands.OFF)
[docs] def toggle_relay(self, label): """Toggle relay from OFF<->ON""" self.change_relay_state(self.relay_labels[label], TruckerBoardCommands.TOGGLE)
[docs] def cycle_relay(self, label): """Cycle the relay with a default 5-second delay.""" self.change_relay_state(self.relay_labels[label], TruckerBoardCommands.CYCLE_DELAY)
[docs] def to_dataframe(self, **kwargs): """Make a dataframe from the latest readings.""" try: columns = ["time", "ac_ok", "battery_low"] + list(self.relay_labels.keys()) df0 = pd.DataFrame(self.arduino_board.readings, columns=columns) df0.set_index(["time"], inplace=True) except Exception: df0 = pd.DataFrame([], index=pd.DatetimeIndex([])) return df0
[docs] def record(self, collection_name: str = None): """Record the rolling mean of the power readings in the database. Args: collection_name (str): Where to store the results in the db. If None (the default), then use `arduino_board_name`. """ recent_values = self.readings collection_name = collection_name or self.arduino_board_name self.db.insert_current(collection_name, recent_values, store_permanently=False) return recent_values
[docs] def setup_relays(self, relays: dict[str, dict]): """Setup the relays.""" for relay_name, relay_config in relays.items(): relay_index = TruckerRelayIndex[relay_name] relay_label = relay_config.get("label") or relay_name default_state = PinState[relay_config.get("default_state", "off").upper()] # Create relay object. self.logger.debug(f"Creating {relay_label=} for {relay_config!r}") relay = Relay( name=relay_name, label=relay_config.get("label", ""), relay_index=relay_index, default_state=default_state, ) # Add convenience methods on the relay itself. setattr(relay, "turn_on", partial(self.turn_on, relay.label)) setattr(relay, "turn_off", partial(self.turn_off, relay.label)) setattr(relay, "toggle_relay", partial(self.toggle_relay, relay.label)) setattr(relay, "cycle_relay", partial(self.cycle_relay, relay.label)) # Track relays by list and by friendly label. self.relays.append(relay) self.relay_labels[relay.label] = relay # Set an attribute on the board for easy access by name and label. setattr(self, relay.name, relay) setattr(self, relay.label, relay) self.logger.info(f"{relay.label} added to board") self.logger.info(f"Relays: {self.relays!r}")
[docs] def change_relay_state(self, relay: Relay, new_state_command: TruckerBoardCommands): """Changes the relay to the new state.""" write_command = to_json(dict(relay=relay.relay_index.value, power=new_state_command.value)) self.logger.debug(f"Sending relay state change command to board: {write_command!r}") self.arduino_board.write(f"{write_command}")
[docs] def default_reader_callback(self, data): """Parse a JSON line from the Arduino into a list for DataFrame append. Args: data (str): JSON string produced by the Arduino firmware, containing keys for relay states and readings along with ac_ok/battery_low. Returns: list | None: A list of values in the order [timestamp, ac_ok, battery_low, <relay readings...>] suitable for constructing/appending to a pandas DataFrame, or None if the reading is ignored/invalid. """ name_key = "name" relay_key = "relays" values_key = "readings" ac_key = "ac_ok" battery_key = "battery_low" if self._ignore_readings > 0: self._ignore_readings -= 1 return self.logger.trace(f"Received: {data!r}") try: data = from_json(data) except error.InvalidDeserialization as e: self.logger.warning(f"Error here: {e!r}") return if data[name_key] != self.arduino_board_name: self.logger.warning("Not reading the power_board. Skipping data.") return # Check we got a valid reading. if len(data[relay_key]) != len(TruckerRelayIndex) and len(data[values_key]) != len(TruckerRelayIndex): self.logger.debug("Did not get a full valid reading") return # Todo: make sure not receiving stale or out of order data using `uptime`. # Create a list for the new data row and add common time and AC reading. new_data = [current_time().to_datetime(), data.get(ac_key, -1), data.get(battery_key, -1)] for relay_index, read_relay in enumerate(self.relays): # Update the state of the pin. read_relay.state = PinState(data[relay_key][relay_index]) if read_relay.state == PinState.OFF: # Give a negative value for off rather than zero. data[values_key][relay_index] = -1 # Record the new value. new_data.append(data[values_key][relay_index]) return new_data
def __str__(self): relay_states = " ".join([POWER_SYMBOLS[r.state] for r in self.relays]) return f"{self.name} [{relay_states}]" def __repr__(self): return to_json( { "name": self.name, "port": self.port, "relays": [dict(name=r.name, label=r.label, state=r.state.name) for r in self.relays], } )
[docs] @classmethod def lookup_port(cls, vendor_id=0x2341, product_id=0x0043, **kwargs): """Tries to guess the port hosting the power board arduino. The default vendor_id is for official Arduino products. The default product_id is for an Uno Rev 3. https://github.com/arduino/Arduino/blob/1.8.0/hardware/arduino/avr/boards.txt#L51-L58 """ dev_path = None with suppress(error.NotFound): dev_path = find_serial_port(vendor_id, product_id) return dev_path