"""Power board sensor interface for relays and current/voltage readings.
Provides PowerBoard, a serial-connected reader/controller for the PANOPTES
power relay board (Arduino + relay shield), and a small Relay data class for
labels/states. Includes helpers to toggle relays and to parse/aggregate
telemetry into rolling means and status dictionaries.
"""
import time
from collections.abc import Callable
from contextlib import suppress
from dataclasses import dataclass
from enum import IntEnum
from functools import partial
import pandas as pd
from astropy import units as u
from panoptes.utils import error
from panoptes.utils.serial.device import SerialDevice, find_serial_port
from panoptes.utils.serializers import from_json, to_json
from panoptes.utils.time import current_time
from panoptes.pocs.base import PanBase
[docs]
class PinState(IntEnum):
"""Represents a HIGH or LOW state for a digital output pin."""
OFF = 0
LOW = 0
ON = 1
HIGH = 1
POWER_SYMBOLS = {PinState.OFF: "⭘", PinState.ON: "⏽"}
[docs]
class TruckerBoardCommands(IntEnum):
"""The Trucker Board can accept a series of commands for controlling the relays"""
OFF = 0
ON = 1
TOGGLE = 2 # Toggle relay.
CYCLE_DELAY = 3 # Cycle the current state with a 30 second delay.
[docs]
class TruckerRelayIndex(IntEnum):
"""The available relays on the Trucker Board"""
RELAY_0 = 0
RELAY_1 = 1
RELAY_2 = 2
RELAY_3 = 3
RELAY_4 = 4
[docs]
@dataclass
class Relay:
"""Relay data class"""
name: str
relay_index: TruckerRelayIndex
label: str | None
state: PinState | None = PinState.OFF
default_state: PinState | None = PinState.OFF
[docs]
def turn_on(self):
"""Turn this relay ON (helper bound by PowerBoard.setup_relays)."""
pass
[docs]
def turn_off(self):
"""Turn this relay OFF (helper bound by PowerBoard.setup_relays)."""
pass
[docs]
def toggle_relay(self):
"""Toggle this relay from OFF to ON or vice versa."""
pass
[docs]
def cycle_relay(self):
"""Cycle this relay (toggle with a small delay), as supported by firmware."""
pass
def __str__(self):
return f"[{self.name}] {self.label} {self.state.name}"
[docs]
class PowerBoard(PanBase):
"""Power distribution and monitoring.
This represents a "trucker" board for PANOPTES, which is a combination of an
Arduino Uno and an Infineon 24V relay shield.
The relay shield has three PROFETs on them that are capable of sensing the
current through the relay. RELAY_4 has a dedicated PROFET. The other two
PROFETs can switch between two relays depending on the status of the appropriate
DSEL pin.
This class uses `panoptes.utils.device.serial.SerialDevice` for threaded (async)
reading of the values from the Arduino Uno connected to the power board, which
are parsed by the default callback and appended to a `deque` whose size can
be controlled. A custom callback can be passed that should accept a single
string parameter and return a dictionary.
Pin names specified above correspond to Infineon terminology. See manual:
https://bit.ly/2IGgWLQ.
"""
def __init__(
self,
port: str = None,
name: str = "Power Board",
relays: dict[str, dict] = None,
reader_callback: Callable[[dict], dict] = None,
mean_interval: int | None = 5,
arduino_board_name: str = "power_board",
*args,
**kwargs,
):
"""Initialize the power board.
The `relays` should be a dictionary with the relay name as key and a
dict with `label` and `initial_state` entries. Example:
RELAY_0:
label: mount
default_state: on
Args:
port (str, optional): The dev port for the arduino, if not provided, search for port
matching the vendor (2341) and product id (0043).
name (str): The user-friendly name for the power board.
relays (dict[Relay] or None): The relay configuration. See notes for details.
reader_callback (Callable): A callback for the serial readings. The
default callback will update the pin values and record data in a
json format, which is then made into a dataframe with the `to_dataframe`.
mean_interval (int): When taking a rolling mean, use this many seconds,
default 5.
arduino_board_name (str): The name of the arduino board to match in
the callback and the collection name for storing in `record.
"""
super().__init__(*args, **kwargs)
if port is None:
port = PowerBoard.lookup_port(**kwargs)
if port is None:
raise error.NotFound("Failed to automatically find port for PowerBoard.")
self.logger.info(f"Guessing that arduino is on {port=}")
self.port = port
self.name = name
self.arduino_board_name = "power_board"
reader_callback = reader_callback or self.default_reader_callback
self.logger.debug(f"Setting up Power board connection for {name=} on {self.port}")
self._ignore_readings = 5
self.arduino_board = SerialDevice(
port=self.port,
serial_settings=dict(baudrate=9600),
reader_callback=reader_callback,
name=arduino_board_name,
)
self.relays: list[Relay] = list()
self.relay_labels: dict[str, Relay] = dict()
self.setup_relays(relays)
time.sleep(2)
# Set initial relay states.
for relay in self.relays:
self.logger.info(f"Setting {relay.label} to {relay.default_state.name}")
self.change_relay_state(relay, TruckerBoardCommands(relay.default_state))
self._mean_interval = mean_interval
self.logger.info("Power board initialized")
@property
def status(self):
"""Summarize relay states and latest readings.
Returns:
dict: Mapping of relay name to a dict with label, state, and reading,
plus top-level 'ac_ok' and 'battery_low' booleans if available.
"""
readings = self.readings
if not readings:
self.logger.warning("No readings available. If system just started please wait a moment.")
return {}
status = {
r.name: dict(label=r.label, state=r.state.name, reading=readings[r.label]) for r in self.relays
}
status["ac_ok"] = readings["ac_ok"]
status["battery_low"] = readings["battery_low"]
return status
@property
def readings(self):
"""Return the rolling mean of the readings."""
time_start = (current_time() - self._mean_interval * u.second).to_datetime()
df = self.to_dataframe()[time_start:]
if len(df) == 0:
return {}
values = df.mean().astype("int").to_dict()
# Add the most recent ac_ok and battery_low check.
try:
values["ac_ok"] = bool(df.iloc[-1]["ac_ok"])
values["battery_low"] = bool(df.iloc[-1]["battery_low"])
except (KeyError, IndexError):
values["ac_ok"] = None
values["battery_low"] = None
return values
[docs]
def turn_on(self, label):
"""Turns on the relay with the given label."""
self.change_relay_state(self.relay_labels[label], TruckerBoardCommands.ON)
[docs]
def turn_off(self, label):
"""Turns off the relay with the given label."""
self.change_relay_state(self.relay_labels[label], TruckerBoardCommands.OFF)
[docs]
def toggle_relay(self, label):
"""Toggle relay from OFF<->ON"""
self.change_relay_state(self.relay_labels[label], TruckerBoardCommands.TOGGLE)
[docs]
def cycle_relay(self, label):
"""Cycle the relay with a default 5-second delay."""
self.change_relay_state(self.relay_labels[label], TruckerBoardCommands.CYCLE_DELAY)
[docs]
def to_dataframe(self, **kwargs):
"""Make a dataframe from the latest readings."""
try:
columns = ["time", "ac_ok", "battery_low"] + list(self.relay_labels.keys())
df0 = pd.DataFrame(self.arduino_board.readings, columns=columns)
df0.set_index(["time"], inplace=True)
except Exception:
df0 = pd.DataFrame([], index=pd.DatetimeIndex([]))
return df0
[docs]
def record(self, collection_name: str = None):
"""Record the rolling mean of the power readings in the database.
Args:
collection_name (str): Where to store the results in the db. If None
(the default), then use `arduino_board_name`.
"""
recent_values = self.readings
collection_name = collection_name or self.arduino_board_name
self.db.insert_current(collection_name, recent_values, store_permanently=False)
return recent_values
[docs]
def setup_relays(self, relays: dict[str, dict]):
"""Setup the relays."""
for relay_name, relay_config in relays.items():
relay_index = TruckerRelayIndex[relay_name]
relay_label = relay_config.get("label") or relay_name
default_state = PinState[relay_config.get("default_state", "off").upper()]
# Create relay object.
self.logger.debug(f"Creating {relay_label=} for {relay_config!r}")
relay = Relay(
name=relay_name,
label=relay_config.get("label", ""),
relay_index=relay_index,
default_state=default_state,
)
# Add convenience methods on the relay itself.
setattr(relay, "turn_on", partial(self.turn_on, relay.label))
setattr(relay, "turn_off", partial(self.turn_off, relay.label))
setattr(relay, "toggle_relay", partial(self.toggle_relay, relay.label))
setattr(relay, "cycle_relay", partial(self.cycle_relay, relay.label))
# Track relays by list and by friendly label.
self.relays.append(relay)
self.relay_labels[relay.label] = relay
# Set an attribute on the board for easy access by name and label.
setattr(self, relay.name, relay)
setattr(self, relay.label, relay)
self.logger.info(f"{relay.label} added to board")
self.logger.info(f"Relays: {self.relays!r}")
[docs]
def change_relay_state(self, relay: Relay, new_state_command: TruckerBoardCommands):
"""Changes the relay to the new state."""
write_command = to_json(dict(relay=relay.relay_index.value, power=new_state_command.value))
self.logger.debug(f"Sending relay state change command to board: {write_command!r}")
self.arduino_board.write(f"{write_command}")
[docs]
def default_reader_callback(self, data):
"""Parse a JSON line from the Arduino into a list for DataFrame append.
Args:
data (str): JSON string produced by the Arduino firmware, containing
keys for relay states and readings along with ac_ok/battery_low.
Returns:
list | None: A list of values in the order [timestamp, ac_ok, battery_low,
<relay readings...>] suitable for constructing/appending to a pandas
DataFrame, or None if the reading is ignored/invalid.
"""
name_key = "name"
relay_key = "relays"
values_key = "readings"
ac_key = "ac_ok"
battery_key = "battery_low"
if self._ignore_readings > 0:
self._ignore_readings -= 1
return
self.logger.trace(f"Received: {data!r}")
try:
data = from_json(data)
except error.InvalidDeserialization as e:
self.logger.warning(f"Error here: {e!r}")
return
if data[name_key] != self.arduino_board_name:
self.logger.warning("Not reading the power_board. Skipping data.")
return
# Check we got a valid reading.
if len(data[relay_key]) != len(TruckerRelayIndex) and len(data[values_key]) != len(TruckerRelayIndex):
self.logger.debug("Did not get a full valid reading")
return
# Todo: make sure not receiving stale or out of order data using `uptime`.
# Create a list for the new data row and add common time and AC reading.
new_data = [current_time().to_datetime(), data.get(ac_key, -1), data.get(battery_key, -1)]
for relay_index, read_relay in enumerate(self.relays):
# Update the state of the pin.
read_relay.state = PinState(data[relay_key][relay_index])
if read_relay.state == PinState.OFF:
# Give a negative value for off rather than zero.
data[values_key][relay_index] = -1
# Record the new value.
new_data.append(data[values_key][relay_index])
return new_data
def __str__(self):
relay_states = " ".join([POWER_SYMBOLS[r.state] for r in self.relays])
return f"{self.name} [{relay_states}]"
def __repr__(self):
return to_json(
{
"name": self.name,
"port": self.port,
"relays": [dict(name=r.name, label=r.label, state=r.state.name) for r in self.relays],
}
)
[docs]
@classmethod
def lookup_port(cls, vendor_id=0x2341, product_id=0x0043, **kwargs):
"""Tries to guess the port hosting the power board arduino.
The default vendor_id is for official Arduino products. The default product_id
is for an Uno Rev 3.
https://github.com/arduino/Arduino/blob/1.8.0/hardware/arduino/avr/boards.txt#L51-L58
"""
dev_path = None
with suppress(error.NotFound):
dev_path = find_serial_port(vendor_id, product_id)
return dev_path