Source code for panoptes.pocs.utils.cli.sensor
import time
from typing import Optional
import typer
from rich import print
from panoptes.pocs.sensor.remote import RemoteMonitor
from panoptes.pocs.utils.logger import get_logger
app = typer.Typer()
logger = get_logger(stderr_log_level='ERROR')
[docs]
@app.callback()
def main(context: typer.Context):
context.params.update(context.parent.params)
verbose = context.params['verbose']
if verbose:
print(f'Command options from power: {context.params!r}')
[docs]
@app.command()
def monitor(
sensor_name: str,
endpoint: Optional[str] = typer.Option(None, help='The remote endpoint to read. '
'If not provided, use the config key '
'"environment.<sensor_name>.url".'),
store: bool = typer.Option(True, help='If result should be stored in file database.'),
read_frequency: int = typer.Option(60, help='Read frequency in seconds.'),
verbose: bool = False,
):
"""Continuously read remote sensor, optionally storing results."""
remote_monitor = RemoteMonitor(endpoint_url=endpoint, sensor_name=sensor_name)
try:
while True:
result = remote_monitor.capture(store_result=store)
if verbose:
print(result)
time.sleep(read_frequency)
except KeyboardInterrupt:
print(f'[red]Shutting down monitor script for {sensor_name}')
if __name__ == "__main__":
app()