Source code for socs.agents.acu.agent

import argparse
import random
import struct
import time
from enum import Enum

import numpy as np
import ocs
import soaculib as aculib
import soaculib.status_keys as status_keys
import twisted.web.client as tclient
import yaml
from autobahn.twisted.util import sleep as dsleep
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from soaculib.twisted_backend import TwistedHttpBackend
from twisted.internet import protocol, reactor, threads
from twisted.internet.defer import DeferredList, inlineCallbacks

from socs.agents.acu import avoidance
from socs.agents.acu import drivers as sh
from socs.agents.acu import exercisor

#: The number of free ProgramTrack positions, when stack is empty.
FULL_STACK = 10000


#: Maximum update time (in s) for "monitor" process data, even with no changes
MONITOR_MAX_TIME_DELTA = 2.

#: Default scan params by platform type
DEFAULT_SCAN_PARAMS = {
    'ccat': {
        'az_speed': 2,
        'az_accel': 1,
    },
    'satp': {
        'az_speed': 1,
        'az_accel': 1,
    },
}


#: Default Sun avoidance params by platform type (enabled, policy). If
#: either of escape *or* monitoring is enabled, then the full policy
#: must be specified.
SUN_CONFIGS = {
    'ccat': {
        'enabled': False,
        'policy': {
            'exclusion_radius': 20,
            'el_horizon': 10,
            'min_sun_time': 1800,
            'response_time': 7200,
        },
    },
    'satp': {
        'enabled': True,
        'policy': {
            'exclusion_radius': 20,
            'el_horizon': 10,
            'min_sun_time': 1800,
            'response_time': 7200,
        },
    },
}

#: How often to refresh to Sun Safety map (valid up to 2x this time)
SUN_MAP_REFRESH = 6 * avoidance.HOUR


[docs] class ACUAgent: """Agent to acquire data from an ACU and control telescope pointing with the ACU. Parameters: acu_config (str): The configuration for the ACU, as referenced in aculib.configs. Default value is 'guess'. exercise_plan (str): The full path to a scan config file describing motions to cycle through on the ACU. If this is None, the associated process and feed will not be registered. startup (bool): If True, immediately start the main monitoring processes for status and UDP data. ignore_axes (list of str): List of axes to "ignore". "ignore" means that the axis will not be commanded. If a user requests an action that would otherwise move the axis, it is not moved but the action is assumed to have succeeded. The values in this list should be drawn from "az", "el", and "third". disable_idle_reset (bool): If True, don't auto-start idle_reset process for LAT. min_el (float): If not None, override the default configured elevation lower limit. max_el (float): If not None, override the default configured elevation upper limit. avoid_sun (bool): If set, override the default Sun avoidance setting (i.e. force enable or disable the feature). fov_radius (float): If set, override the default Sun avoidance radius (i.e. the radius of the field of view, in degrees, to use for Sun avoidance purposes). named_positions (list of str): Names and target positions to register for use with go_to_named task. If provided, each entry in the list should be of the form "name=az,el" with az and el parseable as floats; e.g. "home=180,45.1". """ def __init__(self, agent, acu_config='guess', exercise_plan=None, startup=False, ignore_axes=None, disable_idle_reset=False, min_el=None, max_el=None, avoid_sun=None, fov_radius=None, named_positions=None): self.log = agent.log # Separate locks for exclusive access to az/el, and boresight motions. self.azel_lock = TimeoutLock() self.boresight_lock = TimeoutLock() self.acu_config_name = acu_config self.acu_config = aculib.guess_config(acu_config) self.sleeptime = self.acu_config['motion_waittime'] self.udp = self.acu_config['streams']['main'] self.udp_schema = aculib.get_stream_schema(self.udp['schema']) self.udp_ext = self.acu_config['streams']['ext'] self.acu8100 = self.acu_config['status']['status_name'] # There may or may not be a special 3rd axis dataset that # needs to be probed. self.acu3rdaxis = self.acu_config['status'].get('3rdaxis_name') self.monitor_fields = status_keys.status_fields[self.acu_config['platform']]['status_fields'] self.motion_limits = self.acu_config['motion_limits'] if min_el: self.log.warn(f'Override: min_el={min_el}') self.motion_limits['elevation']['lower'] = min_el if max_el: self.log.warn(f'Override: max_el={max_el}') self.motion_limits['elevation']['upper'] = max_el # This initializes self.scan_params; these become the default # scan params when calling generate_scan. They can be changed # during run time; they can also be overridden when calling # generate_scan. self.scan_params = {} self._set_default_scan_params() startup_idle_reset = (self.acu_config['platform'] in ['lat', 'ccat'] and not disable_idle_reset) if ignore_axes is None: ignore_axes = [] assert all([x in ['az', 'el', 'third'] for x in ignore_axes]) self.ignore_axes = ignore_axes if len(self.ignore_axes): agent.log.warn('User requested ignore_axes={i}', i=self.ignore_axes) self._reset_sun_params(enabled=avoid_sun, radius=fov_radius) self.named_positions = {} if named_positions: for text in named_positions: try: name, target = text.split('=') _az, _el = target.split(',') self.named_positions[name] = (float(_az), float(_el)) except Exception as e: agent.log.error('Failed to parse named_position string: {text}', text=text) raise e self.exercise_plan = exercise_plan # self.data provides a place to reference data from the monitors. # 'status' is populated by the monitor operation # 'broadcast' is populated by the udp_monitor operation self.data = {'status': {'summary': {}, 'position_errors': {}, 'axis_limits': {}, 'axis_faults_errors_overages': {}, 'axis_warnings': {}, 'axis_failures': {}, 'axis_state': {}, 'osc_alarms': {}, 'commands': {}, 'ACU_failures_errors': {}, 'platform_status': {}, 'ACU_emergency': {}, 'corotator': {}, }, 'broadcast': {}, } self.agent = agent self.take_data = False tclient._HTTP11ClientFactory.noisy = False # Structure for the broadcast process to communicate state to # the monitor process, for a data quality feed. self._broadcast_qual = { 'timestamp': time.time(), 'active': False, 'time_offset': 0, } self.acu_control = aculib.AcuControl( acu_config, backend=TwistedHttpBackend(persistent=False)) self.acu_read = aculib.AcuControl( acu_config, backend=TwistedHttpBackend(persistent=True), readonly=True) agent.register_process('monitor', self.monitor, self._simple_process_stop, blocking=False, startup=startup) agent.register_process('broadcast', self.broadcast, self._simple_process_stop, blocking=False, startup=startup) agent.register_process('monitor_sun', self.monitor_sun, self._simple_process_stop, blocking=False, startup=startup) agent.register_process('generate_scan', self.generate_scan, self._simple_process_stop, blocking=False, startup=False) agent.register_process('idle_reset', self.idle_reset, self._simple_process_stop, blocking=False, startup=startup_idle_reset) basic_agg_params = {'frame_length': 60} fullstatus_agg_params = {'frame_length': 60, 'exclude_influx': True, 'exclude_aggregator': False } influx_agg_params = {'frame_length': 60, 'exclude_influx': False, 'exclude_aggregator': True } self.stop_times = {'az': float('nan'), 'el': float('nan'), 'bs': float('nan'), } self.agent.register_feed('acu_status', record=True, agg_params=fullstatus_agg_params, buffer_time=1) self.agent.register_feed('acu_status_influx', record=True, agg_params=influx_agg_params, buffer_time=1) self.agent.register_feed('acu_commands_influx', record=True, agg_params=influx_agg_params, buffer_time=1) self.agent.register_feed('acu_udp_stream', record=True, agg_params=fullstatus_agg_params, buffer_time=1) self.agent.register_feed('acu_broadcast_influx', record=True, agg_params=influx_agg_params, buffer_time=1) self.agent.register_feed('acu_error', record=True, agg_params=basic_agg_params, buffer_time=1) self.agent.register_feed('sun', record=True, agg_params=basic_agg_params, buffer_time=0) self.agent.register_feed('data_qual', record=True, agg_params=basic_agg_params, buffer_time=0) agent.register_task('go_to', self.go_to, blocking=False, aborter=self._simple_task_abort) agent.register_task('go_to_named', self.go_to_named, blocking=False) agent.register_task('set_scan_params', self.set_scan_params, blocking=False) agent.register_task('fromfile_scan', self.fromfile_scan, blocking=False, aborter=self._simple_task_abort) agent.register_task('set_boresight', self.set_boresight, blocking=False, aborter=self._simple_task_abort) agent.register_task('set_speed_mode', self.set_speed_mode, blocking=False) agent.register_task('stop_and_clear', self.stop_and_clear, blocking=False) agent.register_task('clear_faults', self.clear_faults, blocking=False) agent.register_task('update_sun', self.update_sun, blocking=False) agent.register_task('escape_sun_now', self.escape_sun_now, blocking=False, aborter=self._simple_task_abort) # Automatic exercise program... if exercise_plan: agent.register_process( 'exercise', self.exercise, self._simple_process_stop, stopper_blocking=False) # Use longer default frame length ... very low volume feed. self.agent.register_feed('activity', record=True, buffer_time=0, agg_params={ 'frame_length': 600, }) @inlineCallbacks def _simple_task_abort(self, session, params): # Trigger a task abort by updating state to "stopping" yield session.set_status('stopping') @inlineCallbacks def _simple_process_stop(self, session, params): # Trigger a process stop by updating state to "stopping" yield session.set_status('stopping')
[docs] @ocs_agent.param('_') @inlineCallbacks def idle_reset(self, session, params): """idle_reset() **Process** - To prevent LAT from going into Survival mode, do something on the command interface every so often. (The default inactivity timeout is 1 minute.) """ IDLE_RESET_TIMEOUT = 60 # The watchdog timeout in ACU next_action = 0 while session.status in ['starting', 'running']: if time.time() < next_action: yield dsleep(IDLE_RESET_TIMEOUT / 10) continue success = True try: yield self.acu_control.http.Values(self.acu8100) except Exception as e: self.log.info(' -- failed to reset Idle Stow time: {err}', err=e) success = False session.data.update({ 'timestamp': time.time(), 'reset_ok': success}) if not success: next_action = time.time() + 4 else: next_action = time.time() + IDLE_RESET_TIMEOUT / 2 return True, 'Process "idle_reset" exited cleanly.'
[docs] @inlineCallbacks def monitor(self, session, params): """monitor() **Process** - Refresh the cache of SATP ACU status information and report it on the 'acu_status' and 'acu_status_influx' HK feeds. Summary parameters are ACU-provided time code, Azimuth mode, Azimuth position, Azimuth velocity, Elevation mode, Elevation position, Elevation velocity, Boresight mode, and Boresight position. The session.data of this process is a nested dictionary. Here's an example:: { "StatusDetailed": { "Time": 81.661170959322, "Year": 2023, "Azimuth mode": "Stop", "Azimuth commanded position": -20.0012, "Azimuth current position": -20.0012, "Azimuth current velocity": 0.0002, "Azimuth average position error": 0, "Azimuth peak position error": 0, "Azimuth computer disabled": false, ... }, "Status3rdAxis": { "3rd axis Mode": "Stop", "3rd axis commanded position": 77, "3rd axis current position": 77, "3rd axis computer disabled": "No Fault", ... }, "StatusResponseRate": 19.237531827325963, "PlatformType": "satp", "IgnoredAxes": [], "NamedPositions": { "home": [ 180, 40 ] }, "DefaultScanParams": { "az_speed": 2.0, "az_accel": 1.0, }, "connected": True, } In the case of an SATP, the Status3rdAxis is not populated (the Boresight info can be found in StatusDetailed). In the case of the LAT, the corotator info is queried separately and stored under Status3rdAxis. """ # Note that session.data will get scanned, to assign data to # feed blocks. We make an explicit list of items to ignore # during that scan (not_data_keys). session.data = {'PlatformType': self.acu_config['platform'], 'DefaultScanParams': self.scan_params, 'StatusResponseRate': 0., 'IgnoredAxes': self.ignore_axes, 'NamedPositions': self.named_positions, 'connected': False} not_data_keys = list(session.data.keys()) last_complaint = 0 while True: try: version = yield self.acu_read.http.Version() break except Exception as e: if time.time() - last_complaint > 3600: errormsg = {'aculib_error_message': str(e)} self.log.error(str(e)) self.log.error('monitor process failed to query version! Will keep trying.') last_complaint = time.time() yield dsleep(10) self.log.info(version) session.data['connected'] = True # Numbering as per ICD. mode_key = { 'Stop': 0, 'Preset': 1, 'ProgramTrack': 2, 'Rate': 3, 'SectorScan': 4, 'SearchSpiral': 5, 'SurvivalMode': 6, 'StepTrack': 7, 'GeoSync': 8, 'OPT': 9, 'TLE': 10, 'Stow': 11, 'StarTrack': 12, 'SunTrack': 13, 'MoonTrack': 14, 'I11P': 15, 'AutoTrack/Preset': 16, 'AutoTrack/PositionMemory': 17, 'AutoTrack/PT': 18, 'AutoTrack/OPT': 19, 'AutoTrack/PT/Search': 20, 'AutoTrack/TLE': 21, 'AutoTrack/TLE/Search': 22, # Currently we do not have ICD values for these, but they # are included in the output of Meta. ElSync, at least, # is a known third axis mode for the LAT. 'ElSync': 100, 'UnStow': 101, 'MaintenanceStow': 102, } # fault_key digital values taken from ICD (correspond to byte-encoding) fault_key = { 'No Fault': 0, 'Warning': 1, 'Fault': 2, 'Critical': 3, 'No Data': 4, 'Latched Fault': 5, 'Latched Critical Fault': 6, } pin_key = { # Capitalization matches strings in ACU binary, not ICD. # Are these needed for the SAT still? 'Any Moving': 0, 'All Inserted': 1, 'All Retracted': 2, 'Failure': 3, } lat_pin_key = { # From "meta" output. 'Moving': 0, 'Inserted': 1, 'Retracted': 2, 'Error': 3, } tfn_key = {'None': float('nan'), 'False': 0, 'True': 1, } report_t = time.time() report_period = 20 n_ok = 0 min_query_period = 0.05 # Seconds query_t = 0 prev_checkdata = {'ctime': time.time(), 'Azimuth_mode': None, 'Elevation_mode': None, 'Boresight_mode': None, 'Corotator_mode': None, } j = yield self.acu_read.http.Values(self.acu8100) if self.acu3rdaxis: j2 = yield self.acu_read.http.Values(self.acu3rdaxis) else: j2 = {} session.data.update({'StatusDetailed': j, 'Status3rdAxis': j2, 'StatusResponseRate': n_ok / (query_t - report_t)}) qual_pacer = Pacemaker(.1) was_remote = False last_resp_rate = None data_blocks = {} influx_blocks = {} while session.status in ['running']: now = time.time() if now - query_t < min_query_period: yield dsleep(min_query_period - (now - query_t)) query_t = time.time() if query_t > report_t + report_period: resp_rate = n_ok / (query_t - report_t) if last_resp_rate is None or (abs(resp_rate - last_resp_rate) > max(0.1, last_resp_rate * .01)): self.log.info('Data rate for "monitor" stream is now %.3f Hz' % (resp_rate)) last_resp_rate = resp_rate report_t = query_t n_ok = 0 session.data.update({'StatusResponseRate': resp_rate}) if qual_pacer.next_sample <= time.time(): # Publish UDP data health feed qual_pacer.sleep() # should be instantaneous, just update counters bq = self._broadcast_qual bq_offset = bq['time_offset'] if bq_offset is None: bq_offset = 0. bq_ok = (bq['active'] and (now - bq['timestamp'] < 5) and abs(bq_offset) < 1.) block = { 'timestamp': time.time(), 'block_name': 'qual0', 'data': { 'Broadcast_stream_ok': int(bq_ok), 'Broadcast_recv_offset': bq_offset, } } self.agent.publish_to_feed('data_qual', block) try: j = yield self.acu_read.http.Values(self.acu8100) if self.acu3rdaxis: j2 = yield self.acu_read.http.Values(self.acu3rdaxis) else: j2 = {} session.data.update({'StatusDetailed': j, 'Status3rdAxis': j2, 'connected': True}) n_ok += 1 last_complaint = 0 except Exception as e: if now - last_complaint > 3600: errormsg = {'aculib_error_message': str(e)} self.log.error(str(e)) acu_error = {'timestamp': time.time(), 'block_name': 'ACU_error', 'data': errormsg } self.agent.publish_to_feed('acu_error', acu_error) last_complaint = time.time() session.data['connected'] = False yield dsleep(1) continue for k, v in session.data.items(): if k in not_data_keys: continue for (key, value) in v.items(): for category in self.monitor_fields: if key in self.monitor_fields[category]: if isinstance(value, bool): self.data['status'][category][self.monitor_fields[category][key]] = int(value) elif isinstance(value, int) or isinstance(value, float): self.data['status'][category][self.monitor_fields[category][key]] = value elif value is None: self.data['status'][category][self.monitor_fields[category][key]] = float('nan') else: self.data['status'][category][self.monitor_fields[category][key]] = str(value) self.data['status']['summary']['ctime'] =\ sh.timecode(self.data['status']['summary']['Time']) if self.data['status']['platform_status']['Remote_mode'] == 0: if was_remote: was_remote = False self.log.warn('ACU in local mode!') elif not was_remote: was_remote = True self.log.warn('ACU now in remote mode.') if self.data['status']['summary']['ctime'] == prev_checkdata['ctime']: self.log.warn('ACU time has not changed from previous data point!') # Alert on any axis mode change. for axis_mode in prev_checkdata.keys(): if 'mode' not in axis_mode: continue v = self.data['status']['summary'].get(axis_mode) if v is None: v = self.data['status']['corotator'].get(axis_mode) if v != prev_checkdata[axis_mode]: self.log.info('{axis_mode} is now "{v}"', axis_mode=axis_mode, v=v) prev_checkdata[axis_mode] = v # influx_blocks are constructed based on refers to all # other self.data['status'] keys. Do not add more keys to # any self.data['status'] categories beyond this point new_influx_blocks = {} for category in self.data['status']: new_influx_blocks[category] = { 'timestamp': self.data['status']['summary']['ctime'], 'block_name': category, 'data': {}} if category != 'commands': for statkey, statval in self.data['status'][category].items(): if isinstance(statval, float): influx_val = statval elif isinstance(statval, str): for key_map in [tfn_key, mode_key, fault_key, pin_key, lat_pin_key]: if statval in key_map: influx_val = key_map[statval] break else: raise ValueError('Could not convert value for %s="%s"' % (statkey, statval)) elif isinstance(statval, int): if statkey in ['Year', 'Free_upload_positions']: influx_val = float(statval) else: influx_val = int(statval) new_influx_blocks[category]['data'][statkey + '_influx'] = influx_val else: # i.e. category == 'commands': if str(self.data['status']['commands']['Azimuth_commanded_position']) != 'nan': acucommand_az = {'timestamp': self.data['status']['summary']['ctime'], 'block_name': 'ACU_commanded_positions_az', 'data': {'Azimuth_commanded_position_influx': self.data['status']['commands']['Azimuth_commanded_position']} } self.agent.publish_to_feed('acu_commands_influx', acucommand_az) if str(self.data['status']['commands']['Elevation_commanded_position']) != 'nan': acucommand_el = {'timestamp': self.data['status']['summary']['ctime'], 'block_name': 'ACU_commanded_positions_el', 'data': {'Elevation_commanded_position_influx': self.data['status']['commands']['Elevation_commanded_position']} } self.agent.publish_to_feed('acu_commands_influx', acucommand_el) if self.acu_config['platform'] == 'satp': if str(self.data['status']['commands']['Boresight_commanded_position']) != 'nan': acucommand_bs = {'timestamp': self.data['status']['summary']['ctime'], 'block_name': 'ACU_commanded_positions_boresight', 'data': {'Boresight_commanded_position_influx': self.data['status']['commands']['Boresight_commanded_position']} } self.agent.publish_to_feed('acu_commands_influx', acucommand_bs) # Only keep blocks that have changed or have new data. block_keys = list(new_influx_blocks.keys()) for k in block_keys: if k not in influx_blocks: continue B, N = influx_blocks[k], new_influx_blocks[k] if N['timestamp'] - B['timestamp'] > MONITOR_MAX_TIME_DELTA: continue if any([B['data'][_k] != _v for _k, _v in N['data'].items()]): continue del new_influx_blocks[k] for block in new_influx_blocks.values(): # Check that we have data (commands and corotator often don't) if len(block['data']) > 0: self.agent.publish_to_feed('acu_status_influx', block) influx_blocks.update(new_influx_blocks) # Assemble data for aggregator ... new_blocks = {} for block_name, data_key in [ ('ACU_summary_output', 'summary'), ('ACU_axis_faults', 'axis_faults_errors_overages'), ('ACU_position_errors', 'position_errors'), ('ACU_axis_limits', 'axis_limits'), ('ACU_axis_warnings', 'axis_warnings'), ('ACU_axis_failures', 'axis_failures'), ('ACU_axis_state', 'axis_state'), ('ACU_oscillation_alarm', 'osc_alarms'), ('ACU_command_status', 'commands'), ('ACU_general_errors', 'ACU_failures_errors'), ('ACU_platform_status', 'platform_status'), ('ACU_emergency', 'ACU_emergency'), ('ACU_corotator', 'corotator'), ]: new_blocks[block_name] = { 'timestamp': self.data['status']['summary']['ctime'], 'block_name': block_name, 'data': self.data['status'][data_key], } # Only keep blocks that have changed or have new data. block_keys = list(new_blocks.keys()) for k in block_keys: if k == 'summary': # always store these, as a sort of reference tick. continue if k not in data_blocks: continue B, N = data_blocks[k], new_blocks[k] if N['timestamp'] - B['timestamp'] > MONITOR_MAX_TIME_DELTA: continue if any([B['data'][_k] != _v for _k, _v in N['data'].items()]): continue del new_blocks[k] for block in new_blocks.values(): self.agent.publish_to_feed('acu_status', block) data_blocks.update(new_blocks) return True, 'Acquisition exited cleanly.'
[docs] @ocs_agent.param('auto_enable', type=bool, default=True) @inlineCallbacks def broadcast(self, session, params): """broadcast(auto_enable=True) **Process** - Read UDP data from the port specified by self.acu_config, decode it, and publish to HK feeds. Full resolution (200 Hz) data are written to feed "acu_udp_stream" while 1 Hz decimated are written to "acu_broadcast_influx". The 1 Hz decimated output are also stored in session.data. Args: auto_enable (bool): If True, the Process will try to configure and (re-)enable the UDP stream if at any point the stream seems to drop out. Notes: The session.data looks like this (this is for a SATP running with servo details in the UDP output):: { "Time": 1679499948.8234625, "Corrected_Azimuth": -20.00112176010607, "Corrected_Elevation": 50.011521050839434, "Corrected_Boresight": 29.998428712246067, "Raw_Azimuth": -20.00112176010607, "Raw_Elevation": 50.011521050839434, "Raw_Boresight": 29.998428712246067, "Azimuth_Current_1": -0.000384521484375, "Azimuth_Current_2": -0.0008331298828125, "Elevation_Current_1": 0.003397979736328125, "Boresight_Current_1": -0.000483856201171875, "Boresight_Current_2": -0.000105743408203125, "Azimuth_Vel_1": -0.000002288818359375, "Azimuth_Vel_2": 0, "Az_Vel_Act": -0.0000011444091796875, "Az_Vel_Des": 0, "Az_Vffw": 0, "Az_Pos_Des": -20.00112176010607, "Az_Pos_Err": 0 } """ FMT = self.udp_schema['format'] FMT_LEN = struct.calcsize(FMT) UDP_PORT = self.udp['port'] # The udp_data list is used as a queue; it contains # struct-unpacked samples from the UDP stream in the form # (time_received, data). udp_data = [] fields = self.udp_schema['fields'] session.data = {} # BroadcastStreamControl instance. stream = self.acu_control.streams['main'] class MonitorUDP(protocol.DatagramProtocol): def datagramReceived(self, data, src_addr): now = time.time() host, port = src_addr offset = 0 while len(data) - offset >= FMT_LEN: d = struct.unpack(FMT, data[offset:offset + FMT_LEN]) udp_data.append((now, d)) offset += FMT_LEN handler = reactor.listenUDP(int(UDP_PORT), MonitorUDP()) influx_data = {} influx_data['Time_bcast_influx'] = [] for i in range(2, len(fields)): influx_data[fields[i].replace(' ', '_') + '_bcast_influx'] = [] best_dt = None active = True last_packet_time = time.time() while session.status in ['running']: now = time.time() if len(udp_data) >= 200: if not active: self.log.info('UDP packets are being received.') active = True last_packet_time = now best_dt = None process_data = udp_data[:200] udp_data = udp_data[200:] for recv_time, d in process_data: data_ctime = sh.timecode(d[0] + d[1] / sh.DAY) if best_dt is None or abs(recv_time - data_ctime) < best_dt: best_dt = recv_time - data_ctime self.data['broadcast']['Time'] = data_ctime influx_data['Time_bcast_influx'].append(data_ctime) for i in range(2, len(d)): self.data['broadcast'][fields[i].replace(' ', '_')] = d[i] influx_data[fields[i].replace(' ', '_') + '_bcast_influx'].append(d[i]) acu_udp_stream = {'timestamp': self.data['broadcast']['Time'], 'block_name': 'ACU_broadcast', 'data': self.data['broadcast'] } self.agent.publish_to_feed('acu_udp_stream', acu_udp_stream) influx_means = {} for key in influx_data.keys(): influx_means[key] = np.mean(influx_data[key]) influx_data[key] = [] acu_broadcast_influx = {'timestamp': influx_means['Time_bcast_influx'], 'block_name': 'ACU_bcast_influx', 'data': influx_means, } self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx) sd = {} for ky in influx_means: sd[ky.split('_bcast_influx')[0]] = influx_means[ky] session.data.update(sd) else: # Consider logging an outage, attempting reconfig. if active and now - last_packet_time > 3: self.log.info('No UDP packets are being received.') active = False next_reconfig = time.time() if not active and params['auto_enable'] and next_reconfig <= time.time(): self.log.info('Requesting UDP stream enable.') try: cfg, raw = yield stream.safe_enable() except Exception as err: self.log.info('Exception while trying to enable stream: {err}', err=err) next_reconfig += 60 self._broadcast_qual = { 'timestamp': now, 'active': active, 'time_offset': best_dt, } yield dsleep(.01) handler.stopListening() return True, 'Acquisition exited cleanly.'
@inlineCallbacks def _check_daq_streams(self, stream): yield session = self.agent.sessions[stream] if session.status != 'running': self.log.warn("Process '%s' is not running" % stream) return False if stream == 'broadcast': timestamp = self.data['broadcast'].get('Time') else: timestamp = self.data['status']['summary'].get('ctime') if timestamp is None: self.log.warn('%s daq stream has no data yet.' % stream) return False delta = time.time() - timestamp if delta > 2: self.log.warn(f'{stream} daq stream has old data ({delta} seconds)') return False return True @inlineCallbacks def _check_ready_motion(self, session): bcast_check = yield self._check_daq_streams('broadcast') if not bcast_check: return False, 'Motion blocked; problem with "broadcast" data acq process.' monitor_check = yield self._check_daq_streams('monitor') if not monitor_check: return False, 'Motion blocked; problem with "monitor" data acq process.' if self.data['status']['platform_status']['Remote_mode'] == 0: self.log.warn('ACU in local mode, cannot perform motion with OCS.') return False, 'ACU not in remote mode.' return True, 'Agent state ok for motion.' @inlineCallbacks def _set_modes(self, az=None, el=None, third=None): """Helper for changing individual axis modes. Respects ignore_axes. When setting one axis it is often necessary to write others as well. The current mode is first queried, and written back unmodified. """ modes = list((yield self.acu_control.mode(size=3))) changes = [False, False, False] for i, (k, v) in enumerate([('az', az), ('el', el), ('third', third)]): if k not in self.ignore_axes and v is not None: changes[i] = True modes[i] = v if not any(changes): return if not changes[2]: yield self.acu_control.mode(modes[:2]) else: yield self.acu_control.mode(modes) @inlineCallbacks def _stop(self, all_axes=False): """Helper for putting all axes in Stop. This will normally just issue acu_control.stop(); but if any axes are being "ignored", and the user has not passed all_axes=True, then it will avoid changing the mode of those axes. """ if all_axes or len(self.ignore_axes) == 0: yield self.acu_control.stop() return yield self._set_modes('Stop', 'Stop', 'Stop') def _get_limit_func(self, axis): """Construct a function limit(x) that will enforce that x is within the configured limits for axis. Returns the funcion and the tuple of limits (lower, upper). """ limits = self.motion_limits[axis.lower()] limits = limits['lower'], limits['upper'] def limit_func(target): return max(min(target, limits[1]), limits[0]) return limit_func, limits @inlineCallbacks def _go_to_axis(self, session, axis, target, state_feedback=None): """Execute a movement, using "Preset" mode, on a specific axis. Args: session: session object variable of the parent operation. axis (str): one of 'Azimuth', 'Elevation', 'Boresight'. target (float): target position. state_feedback (dict): place to record state (see notes). Returns: ok (bool): True if the motion completed successfully and arrived at target position. msg (str): success/error message. Notes: This has various checks to ensure the movement executes as expected and in a timely fashion. In the case that the warning horn sounds, this function should block until that completes, even if the requested position has been achieved (i.e. no actual motion was needed). The state_feedback may be used to pipeline the initial parts of the movement, so two functions aren't trying to command at the same time. The ``state_feedback`` dict should be passed in initialized with ``{'state': 'init'}``. When initial commanding is finished, this function will update it to `state="wait"`, and then on completion to `state="done"`. """ # Step time in event loop. TICK_TIME = 0.1 # Time for which to sample distance for "still" and "moving" # conditions. PROFILE_TIME = 1. # When aborting, how many seconds to use to project a good # stopping position (d = v*t) ABORT_TIME = 2. # Threshold (deg) for declaring that we've reached # destination. THERE_YET = 0.01 # How long to wait after initiation for signs of motion, # before giving up. This is normally within 2 or 3 seconds # (SATP), but in "cold" cases where siren needs to sound, this # can be as long as 12 seconds. MAX_STARTUP_TIME = 13. # How long does it take to sound the warning horn? It takes # 10 seconds. Don't wait longer than this. WARNING_HORN_TOO_LONG = 15. # How long after mode change to Preset should we expect to see # brakes released, except in case that warning horn is # sounding? 3 seconds should be enough. WARNING_HORN_DETECT = 3. # Velocity to assume when computing maximum time a move should # take (to bail out in unforeseen circumstances). There are # other checks in place to catch when the platform has not # started moving or has stopped at the wrong place. So the # timeout computed from this should only activate in cases # where some other commander has taken over and then kept the # platform moving around. UNREASONABLE_VEL = 0.1 # Positive acknowledgment of AcuControl.go_to OK_RESPONSE = b'OK, Command executed.' # Enum for the motion states State = Enum(f'{axis}State', ['INIT', 'WAIT_MOVING', 'WAIT_STILL', 'FAIL', 'DONE']) if state_feedback is None: state_feedback = {} state_feedback['state'] = 'init' # If this axis is "ignore", skip it. for _axis, short_name in [ ('Azimuth', 'az'), ('Elevation', 'el'), ('Boresight', 'third'), ]: if _axis == axis and short_name in self.ignore_axes: self.log.warn('Ignoring requested motion on {axis}', axis=axis) state_feedback['state'] = 'done' yield dsleep(1) return True, 'axis successfully ignored' # Specialization for different axis types. class AxisControl: def get_pos(_self): return self.data['status']['summary'][f'{axis}_current_position'] def get_mode(_self): return self.data['status']['summary'][f'{axis}_mode'] def get_vel(_self): return self.data['status']['summary'][f'{axis}_current_velocity'] def get_active(_self): return bool( self.data['status']['axis_state'][f'{axis}_brakes_released'] and not self.data['status']['axis_state'][f'{axis}_axis_stop']) class AzAxis(AxisControl): @inlineCallbacks def goto(_self, target): result = yield self.acu_control.go_to(az=target) return result class ElAxis(AxisControl): @inlineCallbacks def goto(_self, target): result = yield self.acu_control.go_to(el=target) return result class ThirdAxis(AxisControl): def get_vel(_self): return 0. @inlineCallbacks def goto(_self, target): result = yield self.acu_control.go_3rd_axis(target) return result class LatCorotator(ThirdAxis): def get_pos(_self): return self.data['status']['corotator']['Corotator_current_position'] def get_mode(_self): return self.data['status']['corotator']['Corotator_mode'] def get_active(_self): return bool( self.data['status']['corotator']['Corotator_brakes_released'] and not self.data['status']['corotator']['Corotator_axis_stop']) ctrl = None if axis == 'Azimuth': ctrl = AzAxis() elif axis == 'Elevation': ctrl = ElAxis() elif axis == 'Boresight': if self.acu_config['platform'] in ['ccat', 'lat']: ctrl = LatCorotator() else: ctrl = ThirdAxis() if ctrl is None: return False, f"No configuration for axis={axis}" limit_func, _ = self._get_limit_func(axis) # History of recent distances from target. history = [] def get_history(t): # Returns (ok, hist) where hist is roughly the past t # seconds of position data and ok is whether or not # that much history was actually available. n = int(t // TICK_TIME) + 1 return (n <= len(history)), history[-n:] last_state = None state = State.INIT start_time = None motion_aborted = False assumption_fail = False motion_completed = False give_up_time = None has_never_moved = True warning_horn = False while session.status in ['starting', 'running', 'stopping']: # Time ... now = time.time() if start_time is None: start_time = now time_since_start = now - start_time motion_expected = time_since_start > MAX_STARTUP_TIME # Space ... current_pos, current_vel = ctrl.get_pos(), ctrl.get_vel() distance = abs(target - current_pos) history.append(distance) if give_up_time is None: give_up_time = now + distance / UNREASONABLE_VEL \ + MAX_STARTUP_TIME + 2 * PROFILE_TIME \ + WARNING_HORN_TOO_LONG # Do we seem to be moving / not moving? ok, _d = get_history(PROFILE_TIME) still = ok and (np.std(_d) < 0.01) moving = ok and (np.std(_d) >= 0.01) has_never_moved = (has_never_moved and not moving) near_destination = distance < THERE_YET mode_ok = (ctrl.get_mode() == 'Preset') active_now = ctrl.get_active() # Log only on state changes if state != last_state: _state = f'{axis}.state={state.name}' self.log.info( f'{_state:<30} dt={now - start_time:7.3f} dist={distance:8.3f}') last_state = state # Handle task abort if session.status == 'stopping' and not motion_aborted: target = limit_func(current_pos + current_vel * ABORT_TIME) state = State.INIT motion_aborted = True # Turn "too long" into an immediate exit. if now > give_up_time: self.log.error('Motion did not complete in a timely fashion; exiting.') assumption_fail = True break # Main state machine if state == State.INIT: # Set target position and change mode to Preset. result = yield ctrl.goto(target) if result == OK_RESPONSE: state = State.WAIT_MOVING else: self.log.error(f'ACU rejected go_to with message: {result}') state = State.FAIL # Reset the clock for tracking "still" / "moving". history = [] start_time = time.time() elif state == State.WAIT_MOVING: # Position and mode change requested, now wait for # either mode change or clear failure of motion. if mode_ok: if active_now: state = state.WAIT_STILL elif time_since_start > WARNING_HORN_TOO_LONG: self.log.error('Warning horn too long!') state = state.FAIL elif time_since_start > WARNING_HORN_DETECT and not warning_horn: warning_horn = True self.log.info('Warning horn is probably sounding.') elif still and motion_expected: self.log.error(f'Motion did not start within {MAX_STARTUP_TIME:.1f} s.') state = state.FAIL elif state == State.WAIT_STILL: # Once moving, watch for end of motion. state_feedback['state'] = 'wait' if not mode_ok: self.log.error('Unexpected axis mode transition; exiting.') state = State.FAIL elif still: if near_destination: state = State.DONE elif has_never_moved and motion_expected: # The settling time, near a soft limit, can be # a bit long ... so only timeout on # motion_expected if we've never moved at all. self.log.error(f'Motion did not start within {MAX_STARTUP_TIME:.1f} s.') state = State.FAIL elif state == State.FAIL: # Move did not complete as planned. assumption_fail = True break elif state == State.DONE: # We seem to have arrived at destination. motion_completed = True break # Keep only ~20 seconds of history ... _, history = get_history(20.) yield dsleep(TICK_TIME) success = motion_completed and not (motion_aborted or assumption_fail) if success: msg = 'Move complete.' elif motion_aborted: msg = 'Move aborted!' else: msg = 'Irregularity during motion!' state_feedback['state'] = 'done' return success, msg @inlineCallbacks def _go_to_axes(self, session, el=None, az=None, third=None, clear_faults=False): """Execute a movement along multiple axes, using "Preset" mode. This just launches _go_to_axis on each required axis, and collects the results. Args: session: session object variable of the parent operation. az (float): target for Azimuth axis (ignored if None). el (float): target for Elevation axis (ignored if None). third (float): target for Boresight axis (ignored if None). clear_faults (bool): whether to clear ACU faults first. Returns: ok (bool): True if all motions completed successfully and arrived at target position. msg (str): success/error message (combined from each target axis). """ # Construct args for each _go_to_axis command... don't create # the Deferred here, because we will want to clear_faults # first (and the Deferred might start running before that # completes). move_defs = [] for axis_name, short_name, target in [ ('Azimuth', 'az', az), ('Elevation', 'el', el), ('Boresight', 'third', third), ]: if target is not None: move_defs.append( (short_name, (session, axis_name, target))) if len(move_defs) is None: return True, 'No motion requested.' if clear_faults: yield self.acu_control.clear_faults() yield dsleep(1) # Start each move, waiting for each to pass the "init" state # before beginning the next one. moves = [] for name, args in move_defs: fb = {'state': 'init'} move_def = self._go_to_axis(*args, state_feedback=fb) while fb['state'] == 'init': yield dsleep(.1) moves.append(move_def) # Now wait for all to complete. moves = yield DeferredList(moves) all_ok, msgs = True, [] for _ok, result in moves: if _ok: all_ok = all_ok and result[0] msgs.append(result[1]) else: all_ok = False msgs.append(f'Crash! {result}') if all_ok: msg = msgs[0] else: msg = ' '.join([f'{name}: {msg}' for (name, args), msg in zip(move_defs, msgs)]) return all_ok, msg
[docs] @ocs_agent.param('az', type=float) @ocs_agent.param('el', type=float) @ocs_agent.param('end_stop', default=False, type=bool) @inlineCallbacks def go_to(self, session, params): """go_to(az, el, end_stop=False) **Task** - Move the telescope to a particular point (azimuth, elevation) in Preset mode. When motion has ended and the telescope reaches the preset point, it returns to Stop mode and ends. Parameters: az (float): destination angle for the azimuth axis el (float): destination angle for the elevation axis end_stop (bool): put the telescope in Stop mode at the end of the motion """ with self.azel_lock.acquire_timeout(0, job='go_to') as acquired: if not acquired: return False, f"Operation failed: {self.azel_lock.job} is running." if self._get_sun_policy('motion_blocked'): return False, "Motion blocked; Sun avoidance in progress." self.log.info('Clearing faults to prepare for motion.') yield self.acu_control.clear_faults() yield dsleep(1) ok, msg = yield self._check_ready_motion(session) if not ok: return False, msg target_az = params['az'] target_el = params['el'] for axis, target in {'azimuth': target_az, 'elevation': target_el}.items(): limit_func, limits = self._get_limit_func(axis) if target != limit_func(target): raise ocs_agent.ParamError( f'{axis}={target} not in accepted range, ' f'[{limits[0]}, {limits[1]}].') self.log.info(f'Requested position: az={target_az}, el={target_el}') legs, msg = yield self._get_sunsafe_moves(target_az, target_el, zero_legs_ok=False) if msg is not None: self.log.error(msg) return False, msg if len(legs) > 1: self.log.info(f'Executing move via {len(legs)} separate legs (sun optimized)') for leg_az, leg_el in legs: all_ok, msg = yield self._go_to_axes(session, az=leg_az, el=leg_el) if not all_ok: break if all_ok and params['end_stop']: yield self._set_modes(az='Stop', el='Stop') return all_ok, msg
[docs] @ocs_agent.param('target', type=float) @ocs_agent.param('end_stop', default=False, type=bool) @inlineCallbacks def set_boresight(self, session, params): """set_boresight(target, end_stop=False) **Task** - Move the telescope to a particular third-axis angle. Parameters: target (float): destination angle for boresight rotation end_stop (bool): put axes in Stop mode after motion """ with self.boresight_lock.acquire_timeout(0, job='set_boresight') as acquired: if not acquired: return False, f"Operation failed: {self.boresight_lock.job} is running." self.log.info('Clearing faults to prepare for motion.') yield self.acu_control.clear_faults() yield dsleep(1) ok, msg = yield self._check_ready_motion(session) if not ok: return False, msg target = params['target'] for axis, target in {'boresight': target}.items(): limit_func, limits = self._get_limit_func(axis) if target != limit_func(target): raise ocs_agent.ParamError( f'{axis}={target} not in accepted range, ' f'[{limits[0]}, {limits[1]}].') self.log.info(f'Commanded position: boresight={target}') ok, msg = yield self._go_to_axis(session, 'Boresight', target) if ok and params['end_stop']: yield self._set_modes(third='Stop') return ok, msg
[docs] @ocs_agent.param('target') @ocs_agent.param('end_stop', default=True, type=bool) @inlineCallbacks def go_to_named(self, session, params): """go_to_named(target, end_stop=True) **Task** - Move the telescope to a named position, e.g. "home", that has been configured through command line args. Parameters: target (str): name of the target position. end_stop (bool): put axes in Stop mode after motion """ target = self.named_positions.get(params['target']) if target is None: return False, 'Position "%s" is not configured.' % params['target'] ok, msg, _session = self.agent.start('go_to', {'az': target[0], 'el': target[1], 'end_stop': params['end_stop']}) if ok == ocs.ERROR: return False, 'Failed to start go_to task.' ok, msg, _session = yield self.agent.wait('go_to') return (ok == ocs.OK), msg
[docs] @ocs_agent.param('speed_mode', choices=['high', 'low']) @inlineCallbacks def set_speed_mode(self, session, params): """set_speed_mode(speed_mode) **Task** - Set the ACU Speed Mode. This affects motion when in Preset mode, such as when using go_to in this Agent. It should not affect the speed of scans done in ProgramTrack mode. Parameters: speed_mode (str): 'high' or 'low'. Notes: The axes must be in Stop mode for this to work. This task will return an error if the command appears to have failed. The actual speed and acceleration settings for the "high" and "low" (perhaps called "aux") settings must be configured on the ACU front panel. """ http = aculib.streams.ModularHttpInterface( self.acu_config['dev_url'], backend=TwistedHttpBackend()) data = 'Command=Set Speed ' + params['speed_mode'].capitalize() resp_bytes = yield http.Post(data, 'DataSets.CmdGeneralTransfer', '3') resp = resp_bytes.decode('utf8') if '<p>Status: executed</p>' in resp: return True, "Speed mode changed." elif '<p>Status: not allowed</p>' in resp: return False, "Mode change blocked (are you in Stop?)" else: return False, "Response was not as expected."
def _set_default_scan_params(self): # A reference to scan_params is cached in monitor, so copy # individual items rather than creating a new dict here. for k, v in DEFAULT_SCAN_PARAMS[self.acu_config['platform']].items(): self.scan_params[k] = v
[docs] @ocs_agent.param('az_speed', type=float, default=None) @ocs_agent.param('az_accel', type=float, default=None) @ocs_agent.param('reset', default=False, type=bool) @inlineCallbacks def set_scan_params(self, session, params): """set_scan_params(az_speed=None, az_accel=None, reset=False)) **Task** - Update the default scan parameters, used by generate_scan if not passed explicitly. Parameters: az_speed (float, optional): The azimuth scan speed. az_accel (float, optional): The (average) azimuth acceleration at turn-around. reset (bool, optional): If True, reset all params to default values before applying any updates passed explicitly here. """ if params['reset']: self._set_default_scan_params() for k in ['az_speed', 'az_accel']: if params[k] is not None: self.scan_params[k] = params[k] self.log.info('Updated default scan params to {sp}', sp=self.scan_params) yield return True, 'Done'
[docs] @ocs_agent.param('_') @inlineCallbacks def clear_faults(self, session, params): """clear_faults() **Task** - Clear any axis faults. """ yield self.acu_control.clear_faults() session.set_status('stopping') return True, 'Job completed.'
[docs] @ocs_agent.param('all_axes', default=False, type=bool) @inlineCallbacks def stop_and_clear(self, session, params): """stop_and_clear(all_axes=False) **Task** - Change the azimuth, elevation, and 3rd axis modes to Stop; also clear the ProgramTrack stack. Args: all_axes (bool): Send Stop to all axes, even ones user has requested to be ignored. """ def _read_modes(): modes = [self.data['status']['summary']['Azimuth_mode'], self.data['status']['summary']['Elevation_mode']] if self.acu_config['platform'] == 'satp': modes.append(self.data['status']['summary']['Boresight_mode']) elif self.acu_config['platform'] in ['ccat', 'lat']: modes.append(self.data['status']['corotator']['Corotator_mode']) return modes for i in range(6): for short_name, mode in zip(['az', 'el', 'third'], _read_modes()): if (params['all_axes'] or short_name not in self.ignore_axes) and mode != 'Stop': break else: self.log.info('All axes in Stop mode') break yield self._stop(params['all_axes']) self.log.info('Stop called (iteration %i)' % (i + 1)) yield dsleep(0.1) else: msg = 'Failed to set all axes to Stop mode!' self.log.error(msg) return False, msg for i in range(6): free_stack = self.data['status']['summary']['Free_upload_positions'] if free_stack < FULL_STACK: yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer', 'Clear Stack') self.log.info('Clear Stack called (iteration %i)' % (i + 1)) yield dsleep(0.1) else: self.log.info('Stack cleared') break else: msg = 'Failed to clear the ProgramTrack stack!' self.log.warn(msg) return False, msg session.set_status('stopping') return True, 'Job completed'
[docs] @ocs_agent.param('filename', type=str) @ocs_agent.param('adjust_times', type=bool, default=True) @ocs_agent.param('azonly', type=bool, default=True) @inlineCallbacks def fromfile_scan(self, session, params=None): """fromfile_scan(filename, adjust_times=True, azonly=True) **Task** - Upload and execute a scan pattern from numpy file. Parameters: filename (str): full path to desired numpy file. File should contain an array of shape (5, nsamp) or (7, nsamp). See Note. adjust_times (bool): If True (the default), the track timestamps are interpreted as relative times, only, and the track will be formatted so the first point happens a few seconds in the future. If False, the track times will be taken at face value (even if the first one is, like, 0). azonly (bool): If True, the elevation part of the track will be uploaded but the el axis won't be put in ProgramTrack mode. It might be put in Stop mode though. Notes: The columns in the numpy array are: - 0: timestamps, in seconds. - 1: azimuth, in degrees. - 2: elevation, in degrees. - 3: az_vel, in deg/s. - 4: el_vel, in deg/s. - 5: az_flags (2 if last point in a leg; 1 otherwise.) - 6: el_flags (2 if last point in a leg; 1 otherwise.) It is acceptable to omit columns 5 and 6. """ times, azs, els, vas, ves, azflags, elflags = sh.from_file(params['filename']) if min(azs) <= self.motion_limits['azimuth']['lower'] \ or max(azs) >= self.motion_limits['azimuth']['upper']: return False, 'Azimuth location out of range!' if min(els) <= self.motion_limits['elevation']['lower'] \ or max(els) >= self.motion_limits['elevation']['upper']: return False, 'Elevation location out of range!' # Modify times? if params['adjust_times']: times = times + time.time() - times[0] + 5. # Turn those lines into a generator. all_lines = sh.ptstack_format(times, azs, els, vas, ves, azflags, elflags, absolute=True) def line_batcher(lines, n=10): while len(lines): some, lines = lines[:n], lines[n:] yield some point_gen = line_batcher(all_lines) step_time = np.median(np.diff(times)) ok, err = yield self._run_track(session, point_gen, step_time, azonly=params['azonly']) return ok, err
[docs] @ocs_agent.param('az_endpoint1', type=float) @ocs_agent.param('az_endpoint2', type=float) @ocs_agent.param('az_speed', type=float, default=None) @ocs_agent.param('az_accel', type=float, default=None) @ocs_agent.param('el_endpoint1', type=float, default=None) @ocs_agent.param('el_endpoint2', type=float, default=None) @ocs_agent.param('el_speed', type=float, default=0.) @ocs_agent.param('num_scans', type=float, default=None) @ocs_agent.param('start_time', type=float, default=None) @ocs_agent.param('wait_to_start', type=float, default=None) @ocs_agent.param('step_time', type=float, default=None) @ocs_agent.param('az_start', default='end', choices=['end', 'mid', 'az_endpoint1', 'az_endpoint2', 'mid_inc', 'mid_dec']) @ocs_agent.param('az_drift', type=float, default=None) @ocs_agent.param('az_only', type=bool, default=True) @ocs_agent.param('scan_upload_length', type=float, default=None) @inlineCallbacks def generate_scan(self, session, params): """generate_scan(az_endpoint1, az_endpoint2, \ az_speed=None, az_accel=None, \ el_endpoint1=None, el_endpoint2=None, \ el_speed=None, \ num_scans=None, start_time=None, \ wait_to_start=None, step_time=None, \ az_start='end', az_drift=None, az_only=True, \ scan_upload_length=None) **Process** - Scan generator, currently only works for constant-velocity az scans with fixed elevation. Parameters: az_endpoint1 (float): first endpoint of a linear azimuth scan az_endpoint2 (float): second endpoint of a linear azimuth scan az_speed (float): azimuth speed for constant-velocity scan az_accel (float): turnaround acceleration for a constant-velocity scan el_endpoint1 (float): first endpoint of elevation motion. In the present implementation, this will be the constant elevation declared at every point in the track. el_endpoint2 (float): this is ignored. el_speed (float): this is ignored. num_scans (int or None): if not None, limits the scan to the specified number of constant velocity legs. The process will exit without error once that has completed. start_time (float or None): a unix timestamp giving the time at which the scan should begin. The default is None, which means the scan will start immediately (but taking into account the value of wait_to_start). wait_to_start (float): number of seconds to wait before starting a scan, in the case that start_time is None. The default is to compute a minimum time based on the scan parameters and the ACU ramp-up algorithm; this is typically 5-10 seconds. step_time (float): time, in seconds, between points on the constant-velocity parts of the motion. The default is None, which will cause an appropriate value to be chosen automatically (typically 0.1 to 1.0). az_start (str): part of the scan to start at. To start at one of the extremes, use 'az_endpoint1', 'az_endpoint2', or 'end' (same as 'az_endpoint1'). To start in the midpoint of the scan use 'mid_inc' (for first half-leg to have positive az velocity), 'mid_dec' (negative az velocity), or 'mid' (velocity oriented towards endpoint2). az_drift (float): if set, this should be a drift velocity in deg/s. The scan extrema will move accordingly. This can be used to better follow compact sources as they rise or set through the focal plane. az_only (bool): if True (the default), then only the Azimuth axis is put in ProgramTrack mode, and the El axis is put in Stop mode. scan_upload_length (float): number of seconds for each set of uploaded points. If this is not specified, the track manager will try to use as short a time as is reasonable. Notes: Note that all parameters are optional except for az_endpoint1 and az_endpoint2. If only those two parameters are passed, the Process will scan between those endpoints, with the elevation axis held in Stop, indefinitely (until Process .stop method is called).. """ if self._get_sun_policy('motion_blocked'): return False, "Motion blocked; Sun avoidance in progress." self.log.info('User scan params: {params}', params=params) az_endpoint1 = params['az_endpoint1'] az_endpoint2 = params['az_endpoint2'] el_endpoint1 = params['el_endpoint1'] el_endpoint2 = params['el_endpoint2'] # Params with defaults configured ... az_speed = params['az_speed'] az_accel = params['az_accel'] if az_speed is None: az_speed = self.scan_params['az_speed'] if az_accel is None: az_accel = self.scan_params['az_accel'] # Do we need to limit the az_accel? This limit comes from a # maximum jerk parameter; the equation below (without the # empirical 0.85 adjustment) is stated in the SATP ACU ICD. min_turnaround_time = (0.85 * az_speed / 9 * 11.616)**.5 max_turnaround_accel = 2 * az_speed / min_turnaround_time if az_accel > max_turnaround_accel: self.log.warn('WARNING: user requested accel=%.2f; limiting to %.2f' % (az_accel, max_turnaround_accel)) az_accel = max_turnaround_accel # If el is not specified, drop in the current elevation. if el_endpoint1 is None: el_endpoint1 = self.data['status']['summary']['Elevation_current_position'] if el_endpoint2 is None: el_endpoint2 = el_endpoint1 # If requested el is just outside acceptable range, tweak it in. _f, _ = self._get_limit_func('elevation') el_endpoint1, _untweaked_el = _f(el_endpoint1), el_endpoint1 if abs(el_endpoint1 - _untweaked_el) > 0.1: return False, "Current elevation (%.4f) is well outside limits." % _untweaked_el init_el = el_endpoint1 azonly = params.get('az_only', True) scan_upload_len = params.get('scan_upload_length') scan_params = {k: params.get(k) for k in [ 'num_scans', 'num_batches', 'start_time', 'wait_to_start', 'step_time', 'batch_size', 'az_start', 'az_drift'] if params.get(k) is not None} el_speed = params.get('el_speed', 0.0) plan = sh.plan_scan(az_endpoint1, az_endpoint2, el=el_endpoint1, v_az=az_speed, a_az=az_accel, az_start=scan_params.get('az_start')) # Use the plan to set scan upload parameters. if scan_params.get('step_time') is None: scan_params['step_time'] = plan['step_time'] if scan_params.get('wait_to_start') is None: scan_params['wait_to_start'] = plan['wait_to_start'] step_time = scan_params['step_time'] point_batch_count = None if scan_upload_len: point_batch_count = scan_upload_len / step_time self.log.info('The plan: {plan}', plan=plan) self.log.info('The scan_params: {scan_params}', scan_params=scan_params) # Before any motion, check for sun safety. ok, msg = self._check_scan_sunsafe(az_endpoint1, az_endpoint2, el_endpoint1, az_speed, az_accel) if ok: self.log.info('Sun safety check: {msg}', msg=msg) else: self.log.error('Sun safety check fails: {msg}', msg=msg) return False, 'Scan is not Sun Safe.' # Clear faults. self.log.info('Clearing faults to prepare for motion.') yield self.acu_control.clear_faults() yield dsleep(1) # Verify we're good to move ok, msg = yield self._check_ready_motion(session) if not ok: return False, msg # Seek to starting position. Note we ask for at least one leg # here, because go_to_axes knows how to wait for the warning # horn to finish before returning, which relieves us from # handling that delay in the (already onerous) scan point # timing. self.log.info(f'Moving to start position, az={plan["init_az"]}, el={init_el}') legs, msg = yield self._get_sunsafe_moves(plan['init_az'], init_el, zero_legs_ok=False) if msg is not None: self.log.error(msg) return False, msg for leg_az, leg_el in legs: ok, msg = yield self._go_to_axes(session, az=leg_az, el=leg_el) if not ok: return False, f'Start position seek failed with message: {msg}' # Prepare the point generator. g = sh.generate_constant_velocity_scan(az_endpoint1=az_endpoint1, az_endpoint2=az_endpoint2, az_speed=az_speed, acc=az_accel, el_endpoint1=el_endpoint1, el_endpoint2=el_endpoint2, el_speed=el_speed, az_first_pos=plan['init_az'], **scan_params) return (yield self._run_track( session=session, point_gen=g, step_time=step_time, azonly=azonly, point_batch_count=point_batch_count))
@inlineCallbacks def _run_track(self, session, point_gen, step_time, azonly=False, point_batch_count=None): """Run a ProgramTrack track scan, with points provided by a generator. Args: session: session object for the parent operation. point_gen: generator that yields points step_time: the minimum time between point track points. This is used to guarantee that points are uploaded sufficiently in advance for the servo unit to process them. azonly: set to True to leave the el axis locked. point_batch_count: number of points to include in batch uploads. This parameter can be used to increase the value beyond the minimum set internally based on step_time. Returns: Tuple (success, msg) where success is a bool. """ # The approximate loop time LOOP_STEP = 0.1 # seconds # Time to allow for initial ProgramTrack transition. MAX_PROGTRACK_SET_TIME = 5. # Minimum number of points to have in the stack. While the # docs strictly require 4, this number should be at least 1 # more than that to allow for rounding when we are setting the # refill threshold. MIN_STACK_POP = 6 # points if point_batch_count is None: point_batch_count = 0 STACK_REFILL_THRESHOLD = FULL_STACK - \ max(MIN_STACK_POP + LOOP_STEP / step_time, point_batch_count) STACK_TARGET = FULL_STACK - \ max(MIN_STACK_POP * 2 + LOOP_STEP / step_time, point_batch_count * 2) # Special error bits to watch here PTRACK_FAULT_KEYS = [ 'ProgramTrack_position_failure', 'Track_start_too_early', 'Turnaround_accel_too_high', 'Turnaround_time_too_short', ] with self.azel_lock.acquire_timeout(0, job='generate_scan') as acquired: if not acquired: return False, f"Operation failed: {self.azel_lock.job} is running." if session.status not in ['starting', 'running']: return False, "Operation aborted before motion began." yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer', 'Clear Stack') yield dsleep(0.5) if azonly: yield self._set_modes(az='ProgramTrack') else: yield self._set_modes(az='ProgramTrack', el='ProgramTrack') yield dsleep(0.1) # Values for mode are: # - 'go' -- keep uploading points (unless there are no more to upload). # - 'stop' -- do not request more points from generator; finish the ones you have. # - 'abort' -- do not upload more points; exit loop and clear stack. mode = 'go' lines = [] last_mode = None was_graceful_exit = True start_time = time.time() got_progtrack = False faults = {} got_points_in = False first_upload_time = None while True: now = time.time() current_modes = {'Az': self.data['status']['summary']['Azimuth_mode'], 'El': self.data['status']['summary']['Elevation_mode'], 'Remote': self.data['status']['platform_status']['Remote_mode']} free_positions = self.data['status']['summary']['Free_upload_positions'] # Use this var to detect case where we're uploading # points but ACU is quietly dumping them because the # vel is too high. got_points_in = got_points_in \ or (got_progtrack and free_positions < FULL_STACK) if last_mode != mode: self.log.info(f'scan mode={mode}, line_buffer={len(lines)}, track_free={free_positions}') last_mode = mode for k in PTRACK_FAULT_KEYS: if k not in faults and self.data['status']['ACU_failures_errors'].get(k): self.log.info('Fault during track: "{k}"', k=k) faults[k] = True if mode != 'abort': # Reasons we might decide to abort ... if current_modes['Az'] == 'ProgramTrack': got_progtrack = True else: if got_progtrack: self.log.warn('Unexpected exit from ProgramTrack mode!') mode = 'abort' was_graceful_exit = False elif now - start_time > MAX_PROGTRACK_SET_TIME: self.log.warn('Failed to set ProgramTrack mode in a timely fashion.') mode = 'abort' was_graceful_exit = False if not got_points_in and (first_upload_time is not None) \ and (now - first_upload_time > 10): self.log.warn('ACU seems to be dumping our track. Vel too high?') mode = 'abort' if current_modes['Remote'] == 0: self.log.warn('ACU no longer in remote mode!') mode = 'abort' was_graceful_exit = False if session.status == 'stopping': mode = 'abort' if mode == 'abort': lines = [] # Is it time to upload more lines? if free_positions >= STACK_REFILL_THRESHOLD: new_line_target = max(int(free_positions - STACK_TARGET), 1) # Make sure that you get one more line than you # need (len(lines) > new_line_target), so that # after "grabbing the minimum batch", below, there # is still >= 1 line left. The lines-is-empty # check is used to decide we're done. while mode == 'go' and (len(lines) <= new_line_target or lines[-1][0] != 0): try: lines.extend(next(point_gen)) except StopIteration: mode = 'stop' # Grab the minimum batch upload_lines, lines = lines[:new_line_target], lines[new_line_target:] # If the last line has a "group" flag, keep transferring lines. while len(lines) and len(upload_lines) and upload_lines[-1][0] != 0: upload_lines.append(lines.pop(0)) if len(upload_lines): # Discard the group flag and upload all. text = ''.join([line for _flag, line in upload_lines]) # This seems to return b'Ok.' no matter ~what, # so not much point checking it. yield self.acu_control.http.UploadPtStack(text) if first_upload_time is None: first_upload_time = time.time() if len(lines) == 0 and free_positions >= FULL_STACK - 1: break yield dsleep(LOOP_STEP) # Go to Stop mode? # yield self.acu_control.stop() # Clear the stack, but wait a bit or it can cause a fault. # Yes, sometimes you have to wait a very long time ... yield dsleep(10) yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer', 'Clear Stack') if not was_graceful_exit: return False, 'Problems during scan' return True, 'Scan ended cleanly' # # Sun Safety Monitoring and Active Avoidance # def _reset_sun_params(self, enabled=None, radius=None): """Resets self.sun_params based on defaults for this platform. Note if enabled or radius are specified here, they update the defaults (so they endure for the life of the agent). """ config = SUN_CONFIGS[self.acu_config['platform']] # These params update config for the entire run of agent. if enabled is not None: config['enabled'] = bool(enabled) if radius is not None: config['policy']['exclusion_radius'] = radius _p = { # Global enable (but see "disable_until"). 'active_avoidance': False, # Can be set to a timestamp, in which case Sun Avoidance # is disabled until that time has passed. 'disable_until': 0, # Flag for indicating normal motions should be blocked # (Sun Escape is active). 'block_motion': False, # Flag for update_sun to indicate Sun map needs recomputed 'recompute_req': False, # If set, should be a timestamp at which escape_sun_now # will be initiated. 'next_drill': None, # Parameters for the Sun Safety Map computation. 'safety_map_kw': { 'sun_time_shift': 0, }, # Avoidance policy, for use in avoidance decisions. 'policy': None, } # Populate default policy based on platform. _p['active_avoidance'] = config['enabled'] _p['policy'] = config['policy'] # And add in platform limits and move policies _p['policy'].update({ 'min_az': self.motion_limits['azimuth']['lower'], 'max_az': self.motion_limits['azimuth']['upper'], 'min_el': self.motion_limits['elevation']['lower'], 'max_el': self.motion_limits['elevation']['upper'], 'axes_sequential': self.motion_limits.get('axes_sequential', False), }) self.sun_params = _p def _get_sun_policy(self, key): now = time.time() p = self.sun_params active = (p['active_avoidance'] and (now >= p['disable_until'])) if key == 'motion_blocked': return active and p['block_motion'] elif key == 'sunsafe_moves': return active elif key == 'escape_enabled': return active elif key == 'map_valid': return (self.sun is not None and self.sun.base_time is not None and self.sun.base_time <= now and self.sun.base_time >= now - 2 * SUN_MAP_REFRESH) else: return p[key]
[docs] @ocs_agent.param('_') @inlineCallbacks def monitor_sun(self, session, params): """monitor_sun() **Process** - Monitors and reports the position of the Sun; maintains a Sun Safety Map for verifying that moves and scans are Sun-safe; triggers a "Sun escape" if the boresight enters an unsafe position. The monitoring functions are always active (as long as this process is running). But the escape functionality must be explicitly enabled (through the default platform configuration, command line arguments, or the update_sun task). Session data looks like this:: { "timestamp": 1698848292.5579932, "active_avoidance": false, "disable_until": 0, "block_motion": false, "recompute_req": false, "next_drill": null, "safety_map_kw": { "sun_time_shift": 0 }, "policy": { "exclusion_radius": 20, "el_horizon": 10, "min_sun_time": 1800, "response_time": 7200, "min_az": -90, "max_az": 450, "min_el": 18.5, "max_el": 90 }, "sun_pos": { "map_exists": true, "map_is_old": false, "map_ref_time": 1698848179.1123455, "platform_azel": [ 90.0158, 20.0022 ], "sun_radec": [ 216.50815789438036, -14.461844389380719 ], "sun_azel": [ 78.24269024936028, 60.919554369324096 ], "sun_dist": 41.75087242151837, "sun_safe_time": 71760 }, "avoidance": { "safety_unknown": false, "warning_zone": false, "danger_zone": false, "escape_triggered": false, "escape_active": false, "last_escape_time": 0, "sun_is_real": true, "platform_is_moveable": true } } In debugging, the Sun position might be falsified. In that case the "sun_pos" subtree will contain an entry like this:: "WARNING": "Fake Sun Position is in use!", and "avoidance": "sun_is_real" will be set to false. (No other functionality is changed when using a falsified Sun position; flags are computed and actions decided based on the false position.) """ def _get_sun_map(): # To run in thread ... start = time.time() new_sun = avoidance.SunTracker(policy=self.sun_params['policy'], **self.sun_params['safety_map_kw']) return new_sun, time.time() - start def _notify_recomputed(result): nonlocal req_out new_sun, compute_time = result self.log.info('(Re-)computed Sun Safety Map (took %.1fs)' % compute_time) self.sun = new_sun req_out = False def lookup(keys, tree): if isinstance(keys, str): keys = [keys] if len(keys) == 0: if isinstance(tree, (bool, np.bool_)): return int(tree) return tree return lookup(keys[1:], tree[keys[0]]) # Feed -- unpack some elements of session.data feed_keys = { 'sun_avoidance': ('active_avoidance', int), 'sun_az': (('sun_pos', 'sun_azel', 0), float), 'sun_el': (('sun_pos', 'sun_azel', 1), float), 'sun_dist': (('sun_pos', 'sun_dist'), float), 'sun_safe_time': (('sun_pos', 'sun_safe_time'), float), } for k in ['warning_zone', 'danger_zone', 'escape_triggered', 'escape_active']: feed_keys[f'sun_{k}'] = (('avoidance', k), int) feed_pacer = Pacemaker(.1) req_out = False self.sun = None last_panic = 0 session.data = {} while session.status in ['starting', 'running']: new_data = { 'timestamp': time.time(), } new_data.update(self.sun_params) try: az, el = [self.data['status']['summary'][f'{ax}_current_position'] for ax in ['Azimuth', 'Elevation']] if az is None or el is None: raise KeyError except KeyError: az, el = None, None try: moveable = [bool(self.data['status']['platform_status'][k]) for k in ['Safe_mode', 'Remote_mode']] moveable = (not moveable[0]) and moveable[1] except KeyError: moveable = False no_map = self.sun is None old_map = (not no_map and self.sun._now() - self.sun.base_time > SUN_MAP_REFRESH) do_recompute = ( not req_out and (no_map or old_map or self.sun_params['recompute_req']) ) if do_recompute: req_out = True self.sun_params['recompute_req'] = False threads.deferToThread(_get_sun_map).addCallback( _notify_recomputed) new_data.update({ 'sun_pos': { 'map_exists': not no_map, 'map_is_old': old_map, 'map_ref_time': None if no_map else self.sun.base_time, 'platform_azel': (az, el), }, }) # Flags for unsafe position. safety_known, danger_zone, warning_zone = False, False, False # Flag for time shift during debugging. sun_is_real = True if self.sun is not None: info = self.sun.get_sun_pos(az, el) sun_is_real = ('WARNING' not in info) new_data['sun_pos'].update(info) if az is not None: t = self.sun.check_trajectory([az], [el])['sun_time'] new_data['sun_pos']['sun_safe_time'] = t if t > 0 else 0 safety_known = True danger_zone = (t < self.sun_params['policy']['min_sun_time']) warning_zone = (t < self.sun_params['policy']['response_time']) # Has a drill been requested? drill_req = (self.sun_params['next_drill'] is not None and self.sun_params['next_drill'] <= time.time()) # Should we be doing a escape_sun_now? panic_for_real = safety_known and danger_zone and self._get_sun_policy('escape_enabled') panic_for_fun = drill_req # Is escape_sun_now task running? ok, msg, _session = self.agent.status('escape_sun_now') escape_in_progress = (_session.get('status', 'done') != 'done') # Block motion as long as we are not sun-safe. self.sun_params['block_motion'] = (panic_for_real or escape_in_progress) new_data['avoidance'] = { 'safety_unknown': not safety_known, 'warning_zone': warning_zone, 'danger_zone': danger_zone, 'escape_triggered': panic_for_real, 'escape_active': escape_in_progress, 'last_escape_time': last_panic, 'sun_is_real': sun_is_real, 'platform_is_moveable': moveable, } if (panic_for_real or panic_for_fun): now = time.time() # Different retry conditions for moveable / not moveable if moveable and (now - last_panic > 60.): # When moveable, only attempt escape every 1 minute. self.log.warn('monitor_sun is requesting escape_sun_now.') self.agent.start('escape_sun_now') last_panic = now elif not moveable and (now - last_panic > 600.): # When not moveable, only print complaint message every 10 minutes. self.log.warn('monitor_sun cannot request escape_sun_now, ' 'because platform not moveable by remote!') last_panic = now # Regardless, clear the drill indicator -- we don't # want that to occur randomly later. self.sun_params['next_drill'] = None # Update session. session.data.update(new_data) # Publish -- only if we have the sun pos though.. if sun_is_real and safety_known and feed_pacer.next_sample <= time.time(): feed_pacer.sleep() # should be instantaneous, just update counters block = {'timestamp': time.time(), 'block_name': 'sun0', 'data': {}} for kshort, (keys, cast) in feed_keys.items(): block['data'][kshort] = cast(lookup(keys, new_data)) self.agent.publish_to_feed('sun', block) yield dsleep(1) return True, 'monitor_sun exited cleanly.'
[docs] @ocs_agent.param('reset', type=bool, default=None) @ocs_agent.param('enable', type=bool, default=None) @ocs_agent.param('temporary_disable', type=float, default=None) @ocs_agent.param('escape', type=bool, default=None) @ocs_agent.param('avoidance_radius', type=float, default=None) @ocs_agent.param('shift_sun_hours', type=float, default=None) def update_sun(self, session, params): """update_sun(reset=None, enable=None, temporary_disable=None, escape=None, avoidance_radius=None, shift_sun_hours=None) **Task** - Update Sun monitoring and avoidance parameters. All arguments are optional. Args: reset (bool): If True, reset all sun_params to the platform defaults. (The "defaults" includes any overrides specified on Agent command line.) enable (bool): If True, enable active Sun avoidance. If avoidance was temporarily disabled it is re-enabled. If False, disable active Sun avoidance (non-temporarily). temporary_disable (float): If set, disable Sun avoidance for this number of seconds. escape (bool): If True, schedule an escape drill for 10 seconds from now. avoidance_radius (float): If set, change the FOV radius (degrees), for Sun avoidance purposes, to this number. shift_sun_hours (float): If set, compute the Sun position as though it were this many hours in the future. This is for debugging, testing, and work-arounds. Pass zero to cancel. """ do_recompute = False now = time.time() self.log.info('update_sun params: {params}', params={k: v for k, v in params.items() if v is not None}) if params['reset']: self._reset_sun_params() do_recompute = True if params['enable'] is not None: self.sun_params['active_avoidance'] = params['enable'] self.sun_params['disable_until'] = 0 if params['temporary_disable'] is not None: self.sun_params['disable_until'] = params['temporary_disable'] + now if params['escape']: self.log.warn('Setting sun escape drill to start in 10 seconds.') self.sun_params['next_drill'] = now + 10 if params['avoidance_radius'] is not None: self.sun_params['policy']['exclusion_radius'] = \ params['avoidance_radius'] do_recompute = True if params['shift_sun_hours'] is not None: self.sun_params['safety_map_kw']['sun_time_shift'] = \ params['shift_sun_hours'] * 3600 do_recompute = True if do_recompute: self.sun_params['recompute_req'] = True return True, 'Params updated.'
[docs] @ocs_agent.param('_') @inlineCallbacks def escape_sun_now(self, session, params): """escape_sun_now() **Task** - Take control of the platform, and move it to a Sun-Safe position. This will abort/stop any current go_to or generate_scan, identify the safest possible path to North or South (without changing elevation, if possible), and perform the moves to get there. """ state = 'init' last_state = state session.data = {'state': state, 'timestamp': time.time()} while session.status in ['starting', 'running'] and state not in ['escape-done']: az, el = [self.data['status']['summary'][f'{ax}_current_position'] for ax in ['Azimuth', 'Elevation']] if state == 'init': state = 'escape-abort' elif state == 'escape-abort': # raise stop flags and issue stop on motion ops for op in ['generate_scan', 'go_to']: self.agent.stop(op) self.agent.abort(op) state = 'escape-wait-idle' timeout = 30 elif state == 'escape-wait-idle': for op in ['generate_scan', 'go_to']: ok, msg, _session = self.agent.status(op) if _session.get('status', 'done') != 'done': break else: state = 'escape-move' last_move = time.time() timeout -= 1 if timeout < 0: state = 'escape-stop' elif state == 'escape-stop': yield self._stop() state = 'escape-move' last_move = time.time() elif state == 'escape-move': self.log.info('Getting escape path for (t, az, el) = ' '(%.1f, %.3f, %.3f)' % (time.time(), az, el)) escape_path = self.sun.find_escape_paths(az, el) if escape_path is None: self.log.error('Failed to find acceptable path; using ' 'failsafe (South, low el).') legs = [(180., max(self.sun_params['policy']['min_el'], 0))] else: legs = escape_path['moves'].nodes[1:] self.log.info('Escaping to (az, el)={pos} ({n} moves)', pos=legs[-1], n=len(legs)) state = 'escape-move-legs' leg_d = None elif state == 'escape-move-legs': def _leg_done(result): nonlocal state, last_move, leg_d all_ok, msg = result if not all_ok: self.log.error('Leg failed.') # Recompute the escape path. if time.time() - last_move > 60: self.log.error('Too many failures -- giving up for now') state = 'escape-done' else: state = 'escape-move' else: leg_d = None last_move = time.time() if not self._get_sun_policy('escape_enabled'): state = 'escape-done' if leg_d is None: if len(legs) == 0: state = 'escape-done' else: leg_az, leg_el = legs.pop(0) leg_d = self._go_to_axes(session, az=leg_az, el=leg_el, clear_faults=True) leg_d.addCallback(_leg_done) elif state == 'escape-done': # This block won't run -- loop will exit. pass session.data['state'] = state if state != last_state: self.log.info('escape_sun_now: state is now "{state}"', state=state) last_state = state yield dsleep(1) return True, "Exited."
def _check_scan_sunsafe(self, az1, az2, el, v_az, a_az): """This will return True if active avoidance is disabled. If active avoidance is enabled, then it will only return true if the planned scan seems to currently be sun-safe. """ if not self._get_sun_policy('sunsafe_moves'): return True, 'Sun-safety checking is not enabled.' if not self._get_sun_policy('map_valid'): return False, 'Sun Safety Map not computed or stale; run the monitor_sun process.' # Include a bit of buffer for turn-arounds. az1, az2 = min(az1, az2), max(az1, az2) turn = v_az**2 / a_az az1 -= turn az2 += turn n = max(2, int(np.ceil((az2 - az1) / 1.))) azs = np.linspace(az1, az2, n) info = self.sun.check_trajectory(azs, azs * 0 + el) safe = info['sun_time'] >= self.sun_params['policy']['min_sun_time'] if safe: msg = 'Scan is safe for %.1f hours' % (info['sun_time'] / 3600) else: msg = 'Scan will be unsafe in %.1f hours' % (info['sun_time'] / 3600) return safe, msg def _get_sunsafe_moves(self, target_az, target_el, zero_legs_ok=True): """Given a target position, find a Sun-safe way to get there. This will either be a direct move, or else an ordered slew in az before el (or vice versa). Returns (legs, msg). If legs is None, it indicates that no Sun-safe path could be found; msg is an error message. If a path can be found, the legs is a list of intermediate move targets, ``[(az0, el0), (az1, el1) ...]``, terminating on ``(target_az, target_el)``. msg is None in that case. In the case that platform is already at the target position, an empty list of legs will be returned unless zero_legs_ok is False in which case a 1-entry list of legs is returned, with the target position in it. When Sun avoidance is not enabled, this function returns as though the direct path to the target is a safe one (though axes_sequential=True may turn this into a move with two legs). """ # Get current position. try: az, el = [self.data['status']['summary'][f'{ax}_current_position'] for ax in ['Azimuth', 'Elevation']] if az is None or el is None: raise KeyError except KeyError: return None, 'Current position could not be determined.' if not self._get_sun_policy('sunsafe_moves'): if self.motion_limits.get('axes_sequential'): # Move in az first, then el. return [(target_az, el), (target_az, target_el)], None return [(target_az, target_el)], None if not self._get_sun_policy('map_valid'): return None, 'Sun Safety Map not computed or stale; run the monitor_sun process.' # Check the target position and block it outright. if self.sun.check_trajectory([target_az], [target_el])['sun_time'] <= 0: return None, 'Requested target position is not Sun-Safe.' moves = self.sun.analyze_paths(az, el, target_az, target_el) move, decisions = self.sun.select_move(moves) if move is None: return None, 'No Sun-Safe moves could be identified!' legs = list(move['moves'].nodes) if len(legs) == 1 and not zero_legs_ok: return legs, None return legs[1:], None
[docs] @ocs_agent.param('starting_index', type=int, default=0) def exercise(self, session, params): """exercise(starting_index=0) **Process** - Run telescope platform through some pre-defined motions. For historical reasons, this does not command agent functions internally, but rather instantiates a *client* and calls the agent as though it were an external entity. """ # Load the exercise plan. plans = yaml.safe_load(open(self.exercise_plan, 'rb')) super_plan = exercisor.get_plan(plans[self.acu_config_name]) session.data = { 'timestamp': time.time(), 'iterations': 0, 'attempts': 0, 'errors': 0, } def _publish_activity(activity): msg = { 'block_name': 'A', 'timestamp': time.time(), 'data': {'activity': activity}, } self.agent.publish_to_feed('activity', msg) def _publish_error(delta_error=1): session.data['errors'] += delta_error msg = { 'block_name': 'B', 'timestamp': time.time(), 'data': {'error_count': session.data['errors']} } self.agent.publish_to_feed('activity', msg) def _exit_now(ok, msg): _publish_activity('idle') self.agent.feeds['activity'].flush_buffer() return ok, msg _publish_activity('idle') _publish_error(0) target_instance_id = self.agent.agent_address.split('.')[-1] exercisor.set_client(target_instance_id, self.agent.site_args) settings = super_plan.get('settings', {}) plan_idx = 0 plan_t = None for plan in super_plan['steps']: plan['iter'] = iter(plan['driver']) while session.status in ['running']: time.sleep(1) session.data['timestamp'] = time.time() session.data['iterations'] += 1 # Fault maintenance faults = exercisor.get_faults() if faults['safe_lock']: self.log.info('SAFE lock detected, exiting') return _exit_now(False, 'Exiting on SAFE lock.') if faults['local_mode']: self.log.info('LOCAL mode detected, exiting') return _exit_now(False, 'Exiting on LOCAL mode.') if faults['az_summary']: if session.data['attempts'] > 5: self.log.info('Too many az summary faults, exiting.') return _exit_now(False, 'Too many az summary faults.') session.data['attempts'] += 1 self.log.info('az summary fault -- trying to clear.') exercisor.clear_faults() time.sleep(10) continue session.data['attempts'] = 0 # Plan execution active_plan = super_plan['steps'][plan_idx] if plan_t is None: plan_t = time.time() now = time.time() if now - plan_t > active_plan['duration']: plan_idx = (plan_idx + 1) % len(super_plan['steps']) plan_t = None continue if settings.get('use_boresight'): bore_target = random.choice(settings['boresight_opts']) self.log.info(f'Setting boresight={bore_target}...') _publish_activity('boresight') exercisor.set_boresight(bore_target) plan, info = next(active_plan['iter']) self.log.info('Launching next scan. plan={plan}', plan=plan) _publish_activity(active_plan['driver'].code) ok = None if 'targets' in plan: exercisor.steps(**plan) else: exercisor.scan(**plan) _publish_activity('idle') if ok is None: self.log.info('Scan completed without error.') else: self.log.info(f'Scan exited with error: {ok}') _publish_error() return _exit_now(True, "Stopped run process")
def add_agent_args(parser_in=None): if parser_in is None: parser_in = argparse.ArgumentParser() pgroup = parser_in.add_argument_group('Agent Options') pgroup.add_argument("--acu-config") pgroup.add_argument("--exercise-plan") pgroup.add_argument("--no-processes", action='store_true', default=False) pgroup.add_argument("--ignore-axes", choices=['el', 'az', 'third'], nargs='+', help="One or more axes to ignore.") pgroup.add_argument("--disable-idle-reset", action='store_true', help="Disable idle_reset, even for LAT.") pgroup.add_argument("--min-el", type=float, help="Override the minimum el defined in platform config.") pgroup.add_argument("--max-el", type=float, help="Override the maximum el defined in platform config.") pgroup.add_argument("--avoid-sun", type=int, help="Pass 0 or 1 to disable or enable Sun avoidance. " "Overrides the platform default config.") pgroup.add_argument("--fov-radius", type=float, help="Override the default field-of-view (radius in " "degrees) for Sun avoidance purposes.") pgroup.add_argument("--named-positions", nargs='+', help="Define named positions, for go_to_named; e.g. 'home=180,60'.") return parser_in def main(args=None): parser = add_agent_args() args = site_config.parse_args(agent_class='ACUAgent', parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) _ = ACUAgent(agent, args.acu_config, args.exercise_plan, startup=not args.no_processes, ignore_axes=args.ignore_axes, disable_idle_reset=args.disable_idle_reset, avoid_sun=args.avoid_sun, fov_radius=args.fov_radius, min_el=args.min_el, max_el=args.max_el, named_positions=args.named_positions) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()