Source code for lvmecp.lights

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2022-05-24
# @Filename: lights.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

from typing import TYPE_CHECKING

from lvmecp.maskbits import LightStatus
from lvmecp.tools import loop_coro


if TYPE_CHECKING:
    from .plc import PLC


__all__ = ["LightsController", "CODE_TO_LIGHT", "CODE_TO_FLAG"]


CODE_TO_LIGHT = {
    "cr": "control_room",
    "sr": "spectrograph_room",
    "ur": "utilities_room",
    "uma": "uma_room",
    "tb": "telescope_bright",
    "tr": "telescope_red",
}

CODE_TO_FLAG = {
    "cr": LightStatus.CONTROL_ROOM,
    "sr": LightStatus.SPECTROGRAPH_ROOM,
    "ur": LightStatus.UTILITY_ROOM,
    "uma": LightStatus.UMA_ROOM,
    "tb": LightStatus.TELESCOPE_BRIGHT,
    "tr": LightStatus.TELESCOPE_RED,
}


[docs]class LightsController: """Controller for the light settings.""" def __init__(self, plc: PLC): self.plc = plc self.client = plc.client self.status = LightStatus(0) self.__update_loop_task = loop_coro(self.update, 10) def __del__(self): self.__update_loop_task.cancel()
[docs] async def update(self): """Refreshes the lights status.""" lights = [dev for dev in self.plc["LIGHTS"].devices if dev.endswith("_status")] values = await self.plc.read_devices(lights, adapt=False) for ii, light in enumerate(lights): code = light.split("_")[0] flag = CODE_TO_FLAG[code] value = values[ii] if value is True: self.status |= flag else: self.status &= ~flag return self.status
[docs] def get_code(self, light: str): """Returns the short-form code for a light. Case-insensitive. Parameters ---------- light The light for which the code is seeked. Examples -------- >>> get_code('telescope_red') 'tr' >>> get_code('telescope bright') 'tb' >>> get_code('uMa Room') 'uma' Raises ------ ValueError When a code cannot be found for the input light. """ light = light.lower() if light in CODE_TO_LIGHT: return light for code, descr in CODE_TO_LIGHT.items(): if light == descr: return code for repl in [" ", "-", ""]: if light == descr.replace("_", repl): return code raise ValueError(f"Cannot find matching code for {light!r}.")
[docs] def get_flag(self, light: str): """Gets the `.LightStatus` flag associated with a light. Parameters ---------- light The light for which the `.LightStatus` a flag is requested. It can be specified in short form (e.g., `tr`), using underscores (`telescope_red`), or spaces (`telescope red`). The light name is case-insensitive. """ code = self.get_code(light) return CODE_TO_FLAG[code]
[docs] async def get_light_status(self, light: str, update: bool = True) -> bool: """Returns the status of a light. Parameters ---------- light The light for which to get the status. update Whether to update the status of all lights before returning the status. If `False`, the last cached status will be used. Returns ------- status `True` if the light is on, `False` otherwise. Raises ------ ValueError If the light is unknown. """ if update: await self.update() flag = self.get_flag(light) return bool(self.status & flag)
[docs] async def get_all(self, update: bool = True) -> dict[str, bool]: """Returns a dictionary with the status of all the lights. Parameters ---------- update Whether to update the status of all lights before returning. If `False`, the last cached status will be used. Returns ------- status A dictionary of light name to status (`True` for on, `False` for off). """ if update: await self.update() status = {} for code, light in CODE_TO_LIGHT.items(): status[light] = bool(self.status & CODE_TO_FLAG[code]) return status
[docs] async def set(self, light: str, action: bool | None = None): """Turns a light on/off. Updates the internal status dictionary. Parameters ---------- light The light to command. action `True` to turn on the light, `False` for off. If `None`, the light status will be switched. Doesn't do anything if the light is already at the desired state. """ current = await self.get_light_status(light, update=True) code_new = self.get_code(light) + "_new" if action is None: action = not current if current == action: return dev_new = self.plc.get_device(code_new) await dev_new.write(action) flag = self.get_flag(light) if action is True: self.status |= flag else: self.status &= ~flag