Source code for socs.agents.acu.hwp_iface

"""
Interface to HWPSupervisor.

"""

import time
from dataclasses import asdict, dataclass, field

from ocs import client_http, ocs_client, site_config

# Helper functions for dealing with el ranges.


def _range_contains(a, b):
    # true if "a contains b" i.e. if b is a subset of a
    return a[0] <= b[0] and a[1] >= b[1]


def _simplify_ranges(*a):
    """Combine and order a set of intervals, to form an ordered,
    non-abutting list of intervals.

    """
    reduced = sorted([tuple(_a) for _a in a])
    i0 = 1
    while i0 < len(reduced):
        a, b = reduced[i0 - 1], reduced[i0]
        if a[1] >= b[0]:
            reduced[i0 - 1] = (a[0], max(b[1], a[1]))
            reduced.pop(i0)
        else:
            i0 += 1
    return reduced


AXIS_NAMES = ['el', 'az', 'third']


[docs] @dataclass class MotionRule: #: Tuple giving the elevation range to which this rule applies. el_range: tuple #: List of grip_state values to which this rule applies. Note "*" #: matches any grip_state. grip_states: list #: List of spin_state values to which this rule applies. Note "*" #: matches any spin_state. spin_states: list #: Dict mapping each axis to a bool that indicates whether moves in #: that axis are permitted. allow_moves: dict
[docs] def test_hwp(self, grip_state, spin_state): """Returns True only if grip_state and spin_state args are matched by this rule. """ if grip_state not in self.grip_states and '*' not in self.grip_states: return False if spin_state not in self.spin_states and '*' not in self.spin_states: return False return True
[docs] @dataclass class HWPInterlocks: #: bool indicating whether the config is valid (and thus eligible #: to be enabled). configured: bool = False #: bool indicating whether to bother with HWP interlocks. enabled: bool = False #: instance_id of HWPSupervisor agent to query. instance_id: str = 'hwp-supervisor' #: amount of leeway in el (deg) to grant when checking rules. tolerance: float = 0.1 #: whether HWP state-based elevation limits should be applied to #: Sun avoidance escape movements limit_sun_avoidance: bool = True #: rules: list of MotionRule objects, each of which grants move #: privs on some axis based on elevation range and HWP state. rules: list = field(default_factory=list) @classmethod def from_dict(cls, cfg): if cfg is None: return cls() self = cls(configured=True, **cfg) self.rules = [MotionRule(**rule) for rule in self.rules] return self def encoded(self, basic=False): d = asdict(self) if basic: d.pop('rules') return d def get_client(self): return HWPSupervisorClient(self.instance_id)
[docs] def test_range(self, el_range, grip_state, spin_state): """Determine what motion types are permitted, given the current HWP state and the elevation (range) of the planned motion. Args: el_range (tuple[float] or None): The starting and ending elevations of the move. grip_state (str): The HWP gripper state. spin_state (str): The HWP spinning state. The rules are evaluated in the context of grip_state and spin_state, to obtain the ranges of els over which moves in each axis is permitted. Then the el_range, if provided is tested against those ranges. E.g for a call like:: test_range((40, 40), 'ungripped', 'not_spinning') the returned structure looks like this:: {'el': (True, (30, 70), [(30, 70)]), 'az': (True, (30, 70), [(30, 70)]), 'third': (False, None, [(30, 50), (60, 70)]), } I.e. for each axis, a tuple is returned where the third value is the full list of permitted elevation ranges for this state; the first value is whether the requested el_range, specifically, overlaps with the elevation ranges, and the second value is the particular range that overlaps with requested el_range (or None if not the case). """ allow_moves = {k: [None, None, []] for k in AXIS_NAMES} for rule in self.rules: if not rule.test_hwp(grip_state, spin_state): continue # For each axis move that is allowed, add the el range # into the allow_moves entry. for k, (_, _, el_ranges) in allow_moves.items(): if rule.allow_moves[k]: el_ranges.append(rule.el_range) for k in allow_moves: allow_moves[k][2] = _simplify_ranges(*allow_moves[k][2]) if el_range is not None: el_range = [min(el_range), max(el_range)] for k in allow_moves.keys(): allow_moves[k][0] = False for _e in allow_moves[k][2]: if _range_contains(_e, el_range): allow_moves[k][0] = True allow_moves[k][1] = _e return {k: tuple(v) for k, v in allow_moves.items()}
[docs] class HWPSupervisorClient: """OCSClient wrapper to query and interpret the state of the HWPSupervisor. Instantiate with the instance_id of the HWPSupervisor to be monitored. """ def __init__(self, instance_id): self.instance_id = instance_id self.cclient = site_config.get_control_client(instance_id)
[docs] def update(self): """Analyze HWPSupervisor state info and make determinations about hwp state. Returns a dict with the results. When everything works well, the useful results in the dict are: - ``timestamp``: current time - ``ok``: True if state determination went reasonably well - ``err_msg``: populated if not ok. - ``grip_state``: one of "unknown", "cold", "warm", "ungripped" - ``spin_state``: one of "unknown", "spinning", "not_spinning" The following raw data from monitor process are copied into the output dict:: hwp_state: is_spinning -> _is_spinning pid_target_freq -> _target_freq gripper: grip_state -> _grip_state brake -> _grip_brakes acu: request_block_motion -> request_block_motion request_block_motion_timestamp -> request_block_motion_timestamp """ ok, err_msg = False, '' hs, acu = {}, {} try: _, _, session = ocs_client.OCSReply(*self.cclient.request('status', 'monitor')) if session is not None: hs = session['data'].get('hwp_state', {}) acu = session['data'].get('acu', {}) ok = True except client_http.ControlClientError as e: err_msg = f'Error getting status: {e}' except Exception as e: err_msg = f'Surprising error: {e}' # Enhanced spin state logic... is_spinning = hs.get('is_spinning') # bool or None target_freq = hs.get('pid_target_freq') # float or None I guess if is_spinning or target_freq not in [None, 0]: spin_state = 'spinning' elif is_spinning is False and target_freq in [0.]: spin_state = 'not_spinning' else: spin_state = 'unknown' # Enhanced gripper state logic... _grip_state = hs.get('gripper', {}).get('grip_state') brakes = hs.get('gripper', {}).get('brake') # 1 1 1 when stable brakes_on = False try: brakes_on = len(brakes) and all(brakes) except BaseException: pass if _grip_state is not None and brakes_on: grip_state = _grip_state else: grip_state = 'unknown' return { 'timestamp': time.time(), 'ok': ok, 'err_msg': err_msg, 'grip_state': grip_state, 'spin_state': spin_state, '_grip_brakes': brakes, '_grip_state': _grip_state, '_is_spinning': is_spinning, '_target_freq': target_freq, 'request_block_motion': acu.get('request_block_motion'), 'request_block_motion_timestamp': acu.get('request_block_motion_timestamp'), }