Source code for panoptes.pocs.utils.cli.power

"""Typer CLI for interacting with the power monitoring/control service.

Provides commands to query relay status and readings, and to toggle relays via
HTTP calls to the FastAPI service implemented in panoptes.pocs.utils.service.power.
"""

import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated

import numpy as np
import requests
import typer
from rich import print
from rich.console import Console
from sparklines import sparklines

from panoptes.pocs.utils.service.power import RelayCommand

console = Console()


[docs] @dataclass class HostInfo: """Simple host/port holder for the power service. Attributes: host (str): Hostname or IP address of the power service. port (str): TCP port for the service (string for simple concatenation). """ host: str = "localhost" port: str = "6564" @property def url(self): """Assemble the base URL for the power service. Returns: str: The base http URL including host and port. """ return f"http://{self.host}:{self.port}"
app = typer.Typer(no_args_is_help=True)
[docs] @app.callback() def common( context: typer.Context, host: str = typer.Option("localhost", help="Power monitor host address."), port: str = typer.Option("6564", help="Power monitor port."), ): """Shared options setup for all power CLI commands. Args: context: Typer context used to share HostInfo across commands. host: Power service host address. port: Power service TCP port. Returns: None """ context.obj = HostInfo(host=host, port=port)
[docs] @app.command() def status(context: typer.Context): """Get and display the status of the power monitor relays. Args: context: Typer context containing HostInfo with the server URL. Returns: None """ url = context.obj.url try: res = requests.get(url) if res.ok: relays = res.json() for relay_index, relay_info in relays.items(): try: relay_label = f"{relay_info['label']:.<20s}" print( f"[{relay_index:.<12s}] " f"{relay_label} [{'green' if relay_info['state'] == 'ON' else 'red'}]" f"{relay_info['state']}[/]" ) except (KeyError, TypeError): print(f"[{relay_index.upper():.<12s}] {str(relay_info):.>23s}") else: print(f"[red]{res.content.decode()}[/red]") except requests.exceptions.ConnectionError: print(f"[red]Cannot connect to {url}[/red]")
[docs] @app.command() def readings(context: typer.Context): """Fetch and display recent readings for each relay. Args: context: Typer context containing HostInfo with base URL. Returns: None """ url = context.obj.url + "/readings" try: res = requests.get(url) if res.ok: relays = res.json() for relay_label, relay_readings in relays.items(): relay_text = f"[cyan]{relay_label:.<20s}[/cyan]" relay_readings = [int(x) if int(x) >= 0 else 0 for x in relay_readings.values()] for val in sparklines(relay_readings): print(f"{relay_text} {val} [{np.array(relay_readings).mean():.0f}]") else: print(f"[red]{res.content.decode()}[/red]") except requests.exceptions.ConnectionError: print(f"[red]Cannot connect to {url}[/red]")
[docs] @app.command() def on( context: typer.Context, relay: str = typer.Argument(..., help="The label or index of the relay to turn on."), ): """Turn a relay on by label or index. Args: context: Typer context carrying HostInfo state. relay: Relay label or index to control. Returns: None """ control(context, relay=relay, command="turn_on")
[docs] @app.command() def off( context: typer.Context, relay: str = typer.Argument(..., help="The label or index of the relay to turn off."), ): """Turn a relay off by label or index. Args: context: Typer context carrying HostInfo state. relay: Relay label or index to control. Returns: None """ control(context, relay=relay, command="turn_off")
[docs] @app.command() def control( context: typer.Context, relay: str = typer.Option(..., help="The label or index of the relay to control."), command: str = typer.Option(..., help='The control action to perform, either "turn_on" or "turn_off"'), ): """Control a relay by label or relay index. Args: context: Typer context with HostInfo state. relay: Relay label or index to control. command: Action to perform, either "turn_on" or "turn_off". Returns: None """ url = context.obj.url + "/control" try: relay_command = RelayCommand(relay=relay, command=command) res = requests.post(url, json=relay_command.model_dump()) content = res.json() if res.ok else res.content.decode() print(content) except requests.exceptions.ConnectionError: print(f"[red]Cannot connect to {url}")
[docs] @app.command() def restart(): """Restart the power server process via supervisorctl""" cmd = "supervisorctl restart pocs-power-monitor" print(f"Running: {cmd}") subprocess.run(cmd, shell=True)
[docs] @app.command(name="setup") def setup_power( confirm: Annotated[ bool, typer.Option( ..., "--confirm", prompt="Are you sure you want to setup the power board?", help="Confirm power board setup.", ), ] = False, arduino_device: str = typer.Option("/dev/ttyACM0", help="Path to the Arduino device."), ): """Set up the power board port and labels; optionally install Arduino sketch. Args: confirm: Confirmation flag to proceed with setup. arduino_device: Path to the Arduino device for uploading the sketch. Returns: None """ if not confirm: print("[red]Cancelled.[/red]") return typer.Abort() arduino_cli_script = Path("~/resources/arduino/install-arduino-cli.sh").expanduser() power_board_script = Path("~/resources/arduino/install-power-board.sh").expanduser() if not arduino_cli_script.exists() and not power_board_script.exists(): print( f"[red]Error: Neither Arduino CLI setup script nor power board setup script found at " f"{arduino_cli_script} and {power_board_script}. Cannot proceed with setup.[/red]" ) return None with console.status(f"Running Arduino CLI setup script: {arduino_cli_script}", spinner="dots"): result = subprocess.run(f"bash {arduino_cli_script}", shell=True, capture_output=True, text=True) if result.returncode != 0: print(f"[red]{result.stdout}\n{result.stderr}[/red]") return None else: print("[green]✅ Arduino CLI setup complete.[/green]") # Check if the Arduino device is available arduino_device_path = Path(arduino_device) do_upload = "true" if arduino_device_path.exists() else "false" env = os.environ.copy() env["DO_UPLOAD"] = do_upload upload_msg = "with upload" if do_upload == "true" else "without upload (device not found)" with console.status( f"Running power board setup script: {power_board_script} ({upload_msg})", spinner="dots" ): result = subprocess.run( f"bash {power_board_script}", shell=True, capture_output=True, text=True, env=env ) if result.returncode != 0: print(f"[red]{result.stdout}\n{result.stderr}[/red]") return None else: print("[green]✅ Power board script complete.[/green]") if do_upload == "0": print( f"[yellow]⚠️ Sketch was compiled but not uploaded because " f"device {arduino_device} was not found. Please connect the " f"device and run this command again to upload the sketch." f"[/yellow]" ) return None
if __name__ == "__main__": app()