import argparse
import random
import struct
import time
from enum import Enum
import numpy as np
import ocs
import soaculib as aculib
import soaculib.status_keys as status_keys
import twisted.web.client as tclient
import yaml
from autobahn.twisted.util import sleep as dsleep
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from soaculib.retwisted_backend import RetwistedHttpBackend
from soaculib.twisted_backend import TwistedHttpBackend
from twisted.internet import protocol, reactor, threads
from twisted.internet.defer import DeferredList, inlineCallbacks
from socs.agents.acu import avoidance
from socs.agents.acu import drivers as sh
from socs.agents.acu import exercisor, hvac, hwp_iface
#: The number of free ProgramTrack positions, when stack is empty.
FULL_STACK = 10000
#: Initial default scan params by platform type.
INIT_DEFAULT_SCAN_PARAMS = {
'ccat': {
'az_speed': 2,
'az_accel': 1,
'el_freq': .15,
'turnaround_method': 'standard',
'el_mode': None,
},
'satp': {
'az_speed': 1,
'az_accel': 1,
'el_freq': 0,
'turnaround_method': 'standard',
'el_mode': None,
},
}
#: Default Sun avoidance configuration blocks, by platform type.
#: Individual settings can be overridden in the platform config file.
#: The full "policy" is constructed from these settings, the
#: motion_limits, and the DEFAULT_POLICY in avoidance.py. When
#: active Sun Avoidance is not enabled, policy parameters are still
#: useful for assessing Sun safety.
INIT_SUN_CONFIGS = {
'ccat': {
'enabled': False,
'exclusion_radius': 20,
'el_horizon': 0,
'el_dodging': True,
'min_sun_time': 1800,
'response_time': 7200,
},
'satp': {
'enabled': True,
'exclusion_radius': 20,
'el_horizon': 10,
'min_sun_time': 1800,
'response_time': 7200,
},
}
#: How often to refresh to Sun Safety map (valid up to 2x this time)
SUN_MAP_REFRESH = 6 * avoidance.HOUR
#: Things the ACU might say when you've done things properly
OK_RESPONSES = [
b'OK, Command executed.',
b'OK, Command send.',
]
# The data structure below defines how data fields are organized in
# the "status" readout of the "monitor" process. Each entry in the
# list is a tuple:
#
# (block_name, fields_key, policy, sample_period)
#
# The "block_name" is the block name in the sense of housekeeping data
# format. The "fields_key" corresponds a labeled group of ACU fields,
# as defined in soaculib.status_keys. The "policy" is a description,
# for the monitor process, of how to store the data. The values for
# policy are:
#
# - None: publish the data at least every X seconds (see
# MONITOR_MAX_TIME_DELTA), or if any of the values have changed.
# - 'tick': publish every sample (as a reference tick).
# - 'changed': publish only when values change (otherwise, stale).
#
# The sample_period is the minimum spacing between samples, even if
# values have changed.
#
# Note that *all groups found in soaculib.status_keys* must be listed
# here -- or an error will be raised on startup. To drop a block of
# data (as defined by fields_key) from the output data, set the
# block_name to None.
#: Block names and update policy for status fields in monitor process.
MONITOR_STRUCTURE = [
('ACU_summary_output', 'summary', 'tick', None),
('ACU_axis_faults', 'axis_faults_errors_overages', None, None),
('ACU_position_errors', 'position_errors', None, None),
('ACU_axis_limits', 'axis_limits', None, None),
('ACU_axis_warnings', 'axis_warnings', None, None),
('ACU_axis_failures', 'axis_failures', None, None),
('ACU_axis_state', 'axis_state', None, None),
('ACU_oscillation_alarm', 'osc_alarms', None, None),
('ACU_command_status', 'commands', None, None),
('ACU_general_errors', 'ACU_failures_errors', None, None),
('ACU_platform_status', 'platform_status', None, None),
('ACU_emergency', 'ACU_emergency', None, None),
('ACU_corotator', 'corotator', None, None),
('ACU_shutter', 'shutter', None, None),
('ACU_tilt', 'tilt_slow', 'changed', 0.5),
(None, 'tilt_fast', None, None),
('ACU_sun_avoidance', 'sun_avoidance', None, 1.),
('ACU_corrections', 'corrections', None, 10.),
('ACU_hvac_data', 'hvac_data', None, 10.),
('ACU_hvac_ctrl', 'hvac_ctrl', None, None),
('ACU_hvac_faults', 'hvac_faults', None, None),
]
#: Maximum update time (in s) for "monitor" process data, even with no changes
MONITOR_MAX_TIME_DELTA = 2.
[docs]
class ACUAgent:
"""Agent to acquire data from an ACU and control telescope pointing with the
ACU.
Parameters:
acu_config (str):
The configuration for the ACU, as referenced in aculib.configs.
Default value is 'guess'.
startup (bool):
If True, immediately start the main monitoring processes
for status and UDP data.
ignore_axes (list of str):
List of axes to "ignore". "ignore" means that the axis
will not be commanded. If a user requests an action that
would otherwise move the axis, it is not moved but the
action is assumed to have succeeded. The values in this
list should be drawn from "az", "el", "third", and "none".
This argument *replaces* the setting from the config file.
("none" entries will simply be ignored.)
disable_idle_reset (bool):
If True, don't auto-start idle_reset process for LAT.
disable_sun_avoidance (bool): If set, start up with Sun
Avoidance completely disabled.
disable_hwp_interlocks (bool): If set, start up with HWP
Interlocks disabled.
min_el (float): If not None, override the default configured
elevation lower limit.
max_el (float): If not None, override the default configured
elevation upper limit.
"""
def __init__(self, agent, acu_config='guess',
startup=False, ignore_axes=None,
disable_idle_reset=False,
disable_sun_avoidance=False,
disable_hwp_interlocks=False,
min_el=None, max_el=None,
):
# Agent support
self.agent = agent
self.log = agent.log
# Separate locks for exclusive access to az/el, and boresight motions.
self.azel_lock = TimeoutLock()
self.boresight_lock = TimeoutLock()
# Config file processing
self.acu_config_name = acu_config
self.acu_config = aculib.guess_config(acu_config)
self.platform_type = self.acu_config['platform'] # ccat, satp.
# List of datasets to read as "status". The 'status' dataset
# is necessary; the 'third' axis can be None (in SATP all the
# boresight info is included in the status dataset); the
# 'shutter' is for LAT shutter and 'pointing' is for LAT
# tiltmeter.
_dsets = self.acu_config['_datasets']
self.datasets = {
'status': _dsets.get('default_dataset'),
'third': _dsets.get('third_axis_dataset'),
'shutter': _dsets.get('shutter_dataset'),
'pointing': _dsets.get('pointing_dataset'),
'hvac': _dsets.get('hvac_dataset'),
}
for k, v in self.datasets.items():
if v is not None:
self.datasets[k] = dict(_dsets['datasets'])[v]
# Create a map from each status key (read through the
# self.datasets) to the output block and field name.
self.status_field_map = {}
for group, group_fields in \
status_keys.status_fields[self.platform_type]['status_fields'].items():
for block_name, block_group, _, _ in MONITOR_STRUCTURE:
if block_group == group:
break
else:
raise ValueError(f"status_key block '{group}' not found in MONITOR_STRUCTURE.")
for acu_key, block_key in group_fields.items():
self.status_field_map[acu_key] = (group, block_name, block_key)
# Config file + overrides processing
# Motion limits (az / el / third ranges).
self.motion_limits = self.acu_config['motion_limits']
if min_el:
self.log.warn(f'Override: min_el={min_el}')
self.motion_limits['elevation']['lower'] = min_el
if max_el:
self.log.warn(f'Override: max_el={max_el}')
self.motion_limits['elevation']['upper'] = max_el
# Sun avoidance (must be set up *after* finalizing motion limits)
self.sun_config = INIT_SUN_CONFIGS[self.platform_type]
self.sun_config.update(self.acu_config.get('sun_avoidance', {}))
if disable_sun_avoidance:
self.sun_config['enabled'] = False
self.log.info('On startup, sun_config={sun_config}',
sun_config=self.sun_config)
self._reset_sun_params()
# Scan params (default vel / accel / el freq).
self.default_scan_params = \
dict(INIT_DEFAULT_SCAN_PARAMS[self.platform_type])
for _k in self.default_scan_params.keys():
_v = self.acu_config.get('scan_params', {}).get(_k)
if _v is not None:
self.default_scan_params[_k] = _v
agent.log.info('On startup, default scan_params={scan_params}',
scan_params=self.default_scan_params)
self.scan_params = dict(self.default_scan_params)
# Axes to ignore.
self.ignore_axes = self.acu_config.get('ignore_axes', [])
if ignore_axes is not None:
self.ignore_axes = [x for x in ignore_axes if x != 'none']
if len(self.ignore_axes):
assert all([x in ['az', 'el', 'third'] for x in self.ignore_axes])
agent.log.warn('Note ignore_axes={i}', i=self.ignore_axes)
# Named positions.
self.named_positions = self.acu_config.get('named_positions', {})
for k, v in self.named_positions.items():
agent.log.info(f'Using named position {k}: {v[0]},{v[1]}')
try:
str(k), float(v[0]), float(v[1])
except Exception:
agent.log.error('Failed to parse named position "{k}"', k=k)
# HWP interlocks.
self.hwp_rules = hwp_iface.HWPInterlocks.from_dict(
self.acu_config.get('hwp_interlocks'))
if disable_hwp_interlocks:
self.hwp_rules.enabled = False
startup_monitor_hwp = (startup and self.hwp_rules.configured)
# Exercise plan.
self.exercise_plan = self.acu_config.get('exercise_plan')
# Other flags.
startup_idle_reset = (self.platform_type in ['lat', 'ccat']
and not disable_idle_reset)
# The connections to the ACU.
tclient._HTTP11ClientFactory.noisy = False
self.acu_control = aculib.AcuControl(
acu_config, backend=RetwistedHttpBackend(persistent=False))
self.acu_read = aculib.AcuControl(
acu_config, backend=TwistedHttpBackend(persistent=True), readonly=True)
# Structures for passing status data around
# self.data provides a place to reference data from the monitors.
# 'status' is populated by the monitor operation
# 'broadcast' is populated by the udp_monitor operation
# 'hwp' is populated by the monitor_hwp operation
self.data = {
'status': {},
'broadcast': {},
'hwp': {},
}
for b, k, _, _ in MONITOR_STRUCTURE:
if b is None:
continue
self.data['status'][k] = {}
# Structure for the broadcast process to communicate state to
# the monitor process, for a data quality feed.
self._broadcast_qual = {
'timestamp': time.time(),
'active': False,
'time_offset': 0,
}
# Task, Process, Feed registration.
agent.register_process('monitor',
self.monitor,
self._simple_process_stop,
blocking=False,
startup=startup)
agent.register_process('broadcast',
self.broadcast,
self._simple_process_stop,
blocking=False,
startup=startup)
if 'ext' in self.acu_control.streams:
agent.register_process('broadcast_ext',
self.broadcast_ext,
self._simple_process_stop,
blocking=False,
startup=False)
agent.register_process('monitor_sun',
self.monitor_sun,
self._simple_process_stop,
blocking=False,
startup=startup)
agent.register_process('monitor_hwp',
self.monitor_hwp,
self._simple_process_stop,
blocking=False,
startup=startup_monitor_hwp)
agent.register_process('generate_scan',
self.generate_scan,
self._simple_process_stop,
blocking=False,
startup=False)
agent.register_process('idle_reset',
self.idle_reset,
self._simple_process_stop,
blocking=False,
startup=startup_idle_reset)
agent.register_process('fromfile_scan',
self.fromfile_scan,
self._simple_process_stop,
blocking=False)
basic_agg_params = {'frame_length': 60}
fullstatus_agg_params = {'frame_length': 60,
'exclude_influx': True,
'exclude_aggregator': False
}
influx_agg_params = {'frame_length': 60,
'exclude_influx': False,
'exclude_aggregator': True
}
agent.register_feed('acu_status',
record=True,
agg_params=fullstatus_agg_params,
buffer_time=1)
agent.register_feed('acu_status_influx',
record=True,
agg_params=influx_agg_params,
buffer_time=1)
agent.register_feed('acu_commands_influx',
record=True,
agg_params=influx_agg_params,
buffer_time=1)
agent.register_feed('acu_udp_stream',
record=True,
agg_params=fullstatus_agg_params,
buffer_time=1)
agent.register_feed('acu_broadcast_influx',
record=True,
agg_params=influx_agg_params,
buffer_time=1)
if 'ext' in self.acu_control.streams:
agent.register_feed('acu_ext_stream',
record=True,
agg_params=fullstatus_agg_params,
buffer_time=1)
agent.register_feed('acu_ext_influx',
record=True,
agg_params=influx_agg_params,
buffer_time=1)
agent.register_feed('acu_error',
record=True,
agg_params=basic_agg_params,
buffer_time=1)
agent.register_feed('sun',
record=True,
agg_params=basic_agg_params,
buffer_time=0)
agent.register_feed('data_qual',
record=True,
agg_params=basic_agg_params,
buffer_time=0)
agent.register_feed('scan_params',
record=True,
agg_params=basic_agg_params,
buffer_time=0)
agent.register_task('go_to',
self.go_to,
blocking=False,
aborter=self._simple_task_abort)
agent.register_task('go_to_named',
self.go_to_named,
blocking=False)
agent.register_task('set_scan_params',
self.set_scan_params,
blocking=False)
agent.register_task('set_boresight',
self.set_boresight,
blocking=False,
aborter=self._simple_task_abort)
agent.register_task('set_speed_mode',
self.set_speed_mode,
blocking=False)
agent.register_task('stop_and_clear',
self.stop_and_clear,
blocking=False)
agent.register_task('clear_faults',
self.clear_faults,
blocking=False)
agent.register_task('special_action',
self.special_action,
blocking=False,
aborter=self._simple_task_abort)
agent.register_task('update_sun',
self.update_sun,
blocking=False)
agent.register_task('escape_sun_now',
self.escape_sun_now,
blocking=False,
aborter=self._simple_task_abort)
if self.datasets['shutter']:
agent.register_task('set_shutter',
self.set_shutter,
blocking=False,
aborter=self._simple_task_abort)
agent.register_task('update_hwp',
self.update_hwp,
blocking=False)
# Automatic exercise program...
if self.exercise_plan:
agent.register_process(
'exercise', self.exercise, self._simple_process_stop,
stopper_blocking=False)
# Use longer default frame length ... very low volume feed.
agent.register_feed('activity',
record=True,
buffer_time=0,
agg_params={
'frame_length': 600,
})
@inlineCallbacks
def _simple_task_abort(self, session, params):
# Trigger a task abort by updating state to "stopping"
yield session.set_status('stopping')
@inlineCallbacks
def _simple_process_stop(self, session, params):
# Trigger a process stop by updating state to "stopping"
yield session.set_status('stopping')
[docs]
@ocs_agent.param('_')
@inlineCallbacks
def idle_reset(self, session, params):
"""idle_reset()
**Process** - To prevent LAT from going into Survival mode,
do something on the command interface every so often. (The
default inactivity timeout is 1 minute.)
"""
IDLE_RESET_TIMEOUT = 60 # The watchdog timeout in ACU
next_action = 0
while session.status in ['starting', 'running']:
if time.time() < next_action:
yield dsleep(IDLE_RESET_TIMEOUT / 10)
continue
success = True
try:
yield self.acu_control.http.Values(self.datasets['status'])
except Exception as e:
self.log.info(' -- failed to reset Idle Stow time: {err}', err=e)
success = False
session.data.update({
'timestamp': time.time(),
'reset_ok': success})
if not success:
next_action = time.time() + 4
else:
next_action = time.time() + IDLE_RESET_TIMEOUT / 2
return True, 'Process "idle_reset" exited cleanly.'
[docs]
@inlineCallbacks
def monitor(self, session, params):
"""monitor()
**Process** - Refresh the cache of SATP ACU status information and
report it on the 'acu_status' and 'acu_status_influx' HK feeds.
Summary parameters are ACU-provided time code, Azimuth mode,
Azimuth position, Azimuth velocity, Elevation mode, Elevation position,
Elevation velocity, Boresight mode, and Boresight position.
The session.data of this process is a nested dictionary.
Here's an example::
{
"StatusDetailed": {
"Time": 81.661170959322,
"Year": 2023,
"Azimuth mode": "Stop",
"Azimuth commanded position": -20.0012,
"Azimuth current position": -20.0012,
"Azimuth current velocity": 0.0002,
"Azimuth average position error": 0,
"Azimuth peak position error": 0,
"Azimuth computer disabled": false,
...
},
"Status3rdAxis": {
"3rd axis Mode": "Stop",
"3rd axis commanded position": 77,
"3rd axis current position": 77,
"3rd axis computer disabled": "No Fault",
...
},
"StatusShutter": {
"Shutter Closed": false,
...
},
"Hvac": {
"Booster EL Housing Failure": false,
"Booster EL Housing on": false,
...
},
"StatusResponseRate": 19.237531827325963,
"PlatformType": "satp",
"IgnoredAxes": [],
"NamedPositions": {
"home": [
180,
40
]
},
"DefaultScanParams": {
"az_speed": 2.0,
"az_accel": 1.0,
},
"connected": True,
}
Differences between SATP and LAT structures:
- The PlatformType reports "satp" for SATP and "ccat" for LAT.
- In the case of an SATP, the Status3rdAxis is not populated;
the Boresight info can be found in StatusDetailed. In the
case of the LAT, the corotator info is queried separately
and stored under Status3rdAxis.
- The StatusShutter and Hvac entries will be populated for the
LAT, but empty for SATP.
"""
# Note that session.data will get scanned, to assign data to
# feed blocks. We make an explicit list of items to ignore
# during that scan (not_data_keys).
session.data = {'PlatformType': self.acu_config['platform'],
'DefaultScanParams': self.scan_params,
'StatusResponseRate': 0.,
'IgnoredAxes': self.ignore_axes,
'NamedPositions': self.named_positions,
'connected': False}
not_data_keys = list(session.data.keys())
last_complaint = 0
while True:
try:
version = yield self.acu_read.http.Version()
break
except Exception as e:
if time.time() - last_complaint > 3600:
errormsg = {'aculib_error_message': str(e)}
self.log.error(str(e))
self.log.error('monitor process failed to query version! Will keep trying.')
last_complaint = time.time()
yield dsleep(10)
self.log.info(version)
session.data['connected'] = True
# Numbering as per ICD.
mode_key = {
'Stop': 0,
'Preset': 1,
'ProgramTrack': 2,
'Rate': 3,
'SectorScan': 4,
'SearchSpiral': 5,
'SurvivalMode': 6,
'StepTrack': 7,
'GeoSync': 8,
'OPT': 9,
'TLE': 10,
'Stow': 11,
'StarTrack': 12,
'SunTrack': 13,
'MoonTrack': 14,
'I11P': 15,
'AutoTrack/Preset': 16,
'AutoTrack/PositionMemory': 17,
'AutoTrack/PT': 18,
'AutoTrack/OPT': 19,
'AutoTrack/PT/Search': 20,
'AutoTrack/TLE': 21,
'AutoTrack/TLE/Search': 22,
# Currently we do not have ICD values for these, but they
# are included in the output of Meta. ElSync, at least,
# is a known third axis mode for the LAT.
'ElSync': 100,
'UnStow': 101,
'MaintenanceStow': 102,
}
# fault_key digital values taken from ICD (correspond to byte-encoding)
fault_key = {
'No Fault': 0,
'Warning': 1,
'Fault': 2,
'Critical': 3,
'No Data': 4,
'Latched Fault': 5,
'Latched Critical Fault': 6,
}
pin_key = {
# Capitalization matches strings in ACU binary, not ICD.
# Are these needed for the SAT still?
'Any Moving': 0,
'All Inserted': 1,
'All Retracted': 2,
'Failure': 3,
}
lat_pin_key = {
# From "meta" output.
'Moving': 0,
'Inserted': 1,
'Retracted': 2,
'Error': 3,
}
tfn_key = {'None': float('nan'),
'False': 0,
'True': 1,
}
report_t = time.time()
report_period = 20
n_ok = 0
min_query_period = 0.05 # Seconds
query_t = 0
# Assist monitoring and logging changes in certain fields.
checkdata = [
('summary', 'ctime'),
('platform_status', 'Remote_mode'),
('summary', 'Azimuth_mode'),
('summary', 'Elevation_mode'),
('summary', 'Boresight_mode'),
('corotator', 'Corotator_mode'),
]
prev_checkdata = {k: None for g, k in checkdata}
@inlineCallbacks
def _get_status():
output = {}
for short, collection in [
('status', 'StatusDetailed'),
('third', 'Status3rdAxis'),
('shutter', 'StatusShutter'),
('pointing', 'CmdPointingCorrection'),
('hvac', 'Hvac'),
]:
if self.datasets[short]:
output[collection] = (
yield self.acu_read.Values(self.datasets[short]))
else:
output[collection] = {}
return output
session.data['StatusResponseRate'] = n_ok / (query_t - report_t)
session.data.update((yield _get_status()))
qual_pacer = Pacemaker(.1)
hvm = hvac.HvacManager()
last_resp_rate = None
data_blocks = {}
influx_blocks = {}
unknown_fields = set()
while session.status in ['running']:
now = time.time()
if now - query_t < min_query_period:
yield dsleep(min_query_period - (now - query_t))
query_t = time.time()
if query_t > report_t + report_period:
resp_rate = n_ok / (query_t - report_t)
if last_resp_rate is None or (abs(resp_rate - last_resp_rate)
> max(0.1, last_resp_rate * .01)):
self.log.info('Data rate for "monitor" stream is now %.3f Hz' % (resp_rate))
last_resp_rate = resp_rate
report_t = query_t
n_ok = 0
session.data.update({'StatusResponseRate': resp_rate})
if qual_pacer.next_sample <= time.time():
# Publish UDP data health feed
qual_pacer.sleep() # should be instantaneous, just update counters
bq = self._broadcast_qual
bq_offset = bq['time_offset']
if bq_offset is None:
bq_offset = 0.
bq_ok = (bq['active'] and (now - bq['timestamp'] < 5)
and abs(bq_offset) < 1.)
block = {
'timestamp': time.time(),
'block_name': 'qual0',
'data': {
'Broadcast_stream_ok': int(bq_ok),
'Broadcast_recv_offset': bq_offset,
}
}
self.agent.publish_to_feed('data_qual', block)
try:
session.data.update((yield _get_status()))
session.data['connected'] = True
n_ok += 1
last_complaint = 0
except Exception as e:
if now - last_complaint > 3600:
errormsg = {'aculib_error_message': str(e)}
self.log.error(str(e))
acu_error = {'timestamp': time.time(),
'block_name': 'ACU_error',
'data': errormsg
}
self.agent.publish_to_feed('acu_error', acu_error)
last_complaint = time.time()
session.data['connected'] = False
yield dsleep(1)
continue
for k, v in session.data.items():
if k in not_data_keys:
continue
if k == 'Hvac' and len(v) > 0 and (hvm.grouped_fields is None):
# Runs when first HVAC data are received. These
# fields aren't listed explicitly in soaculib so
# they're analyzed here.
hvm.parse_fields(v)
assert len(hvm.grouped_fields['unclassified']) == 0
self.status_field_map.update(hvm.get_block_info())
for (key, value) in v.items():
try:
group, block, field = self.status_field_map[key]
except KeyError:
if key not in unknown_fields:
self.log.warn(
'unknown status field (ignored hereafter): "%s"' % key)
unknown_fields.add(key)
continue
if block is None:
continue
# Cast value to saveable type.
if isinstance(value, bool):
value = int(value)
elif isinstance(value, int) or isinstance(value, float):
pass
elif value is None:
value = float('nan')
else:
value = str(value)
# Store.
self.data['status'][group][field] = value
self.data['status']['summary']['ctime'] = \
sh.timecode(self.data['status']['summary']['Time'])
# Check for state changes in some key fields.
new_checkdata = {k: self.data['status'][g].get(k)
for g, k in checkdata}
if new_checkdata['Remote_mode'] != prev_checkdata['Remote_mode']:
if new_checkdata['Remote_mode']:
self.log.warn('ACU now in remote mode.')
else:
self.log.warn('ACU in local mode!')
for axis_mode, v in new_checkdata.items():
if 'mode' not in axis_mode or 'Remote' in axis_mode:
continue
if v != prev_checkdata[axis_mode]:
self.log.info('{axis_mode} is now "{v}"',
axis_mode=axis_mode, v=v)
if new_checkdata['ctime'] == prev_checkdata['ctime']:
self.log.warn('ACU time has not changed from previous data point!')
continue
prev_checkdata = new_checkdata
# influx_blocks are constructed based on refers to all
# other self.data['status'] keys. Do not add more keys to
# any self.data['status'] categories beyond this point
new_influx_blocks = {}
for category in self.data['status']:
new_influx_blocks[category] = {
'timestamp': self.data['status']['summary']['ctime'],
'block_name': category,
'data': {}}
if category != 'commands':
for statkey, statval in self.data['status'][category].items():
if isinstance(statval, float):
influx_val = statval
elif isinstance(statval, str):
for key_map in [tfn_key, mode_key, fault_key, pin_key,
lat_pin_key]:
if statval in key_map:
influx_val = key_map[statval]
break
else:
raise ValueError('Could not convert value for %s="%s"' %
(statkey, statval))
elif isinstance(statval, int):
if statkey in ['Year', 'Free_upload_positions']:
influx_val = float(statval)
else:
influx_val = int(statval)
new_influx_blocks[category]['data'][statkey + '_influx'] = influx_val
else: # i.e. category == 'commands':
if str(self.data['status']['commands']['Azimuth_commanded_position']) != 'nan':
acucommand_az = {'timestamp': self.data['status']['summary']['ctime'],
'block_name': 'ACU_commanded_positions_az',
'data': {'Azimuth_commanded_position_influx': self.data['status']['commands']['Azimuth_commanded_position']}
}
self.agent.publish_to_feed('acu_commands_influx', acucommand_az)
if str(self.data['status']['commands']['Elevation_commanded_position']) != 'nan':
acucommand_el = {'timestamp': self.data['status']['summary']['ctime'],
'block_name': 'ACU_commanded_positions_el',
'data': {'Elevation_commanded_position_influx': self.data['status']['commands']['Elevation_commanded_position']}
}
self.agent.publish_to_feed('acu_commands_influx', acucommand_el)
if self.acu_config['platform'] == 'satp':
if str(self.data['status']['commands']['Boresight_commanded_position']) != 'nan':
acucommand_bs = {'timestamp': self.data['status']['summary']['ctime'],
'block_name': 'ACU_commanded_positions_boresight',
'data': {'Boresight_commanded_position_influx': self.data['status']['commands']['Boresight_commanded_position']}
}
self.agent.publish_to_feed('acu_commands_influx', acucommand_bs)
# Only keep blocks that have changed or have new data.
block_keys = list(new_influx_blocks.keys())
for k in block_keys:
if k not in influx_blocks:
continue
B, N = influx_blocks[k], new_influx_blocks[k]
overdue = (N['timestamp'] - B['timestamp'] > MONITOR_MAX_TIME_DELTA)
changes = any([B['data'][_k] != _v for _k, _v in N['data'].items()])
if overdue or changes:
continue
del new_influx_blocks[k]
for block in new_influx_blocks.values():
# Check that we have data (commands and corotator often don't)
if len(block['data']) > 0:
self.agent.publish_to_feed('acu_status_influx', block)
influx_blocks.update(new_influx_blocks)
# Assemble data for aggregator ...
new_blocks = {}
for block_name, data_key, _, _ in MONITOR_STRUCTURE:
if block_name is None:
continue
new_blocks[block_name] = {
'timestamp': self.data['status']['summary']['ctime'],
'block_name': block_name,
'data': self.data['status'][data_key],
}
# Only keep blocks that have changed or have new data.
for k, _, policy, delta in MONITOR_STRUCTURE:
if k is None:
continue
B, N = data_blocks.get(k), new_blocks[k]
if len(N['data']) == 0:
del new_blocks[k]
continue
if B is None:
continue
if policy == 'tick': # always store.
continue
underdue = delta is not None and \
(N['timestamp'] - B['timestamp'] < delta)
overdue = (N['timestamp'] - B['timestamp'] > MONITOR_MAX_TIME_DELTA) \
and not underdue
changes = any([B['data'][_k] != _v for _k, _v in N['data'].items()])
if (overdue and policy != 'changed') or changes and not underdue:
continue
del new_blocks[k]
for block in new_blocks.values():
self.agent.publish_to_feed('acu_status', block)
data_blocks.update(new_blocks)
return True, 'Acquisition exited cleanly.'
[docs]
@ocs_agent.param('auto_enable', type=bool, default=True)
def broadcast(self, session, params):
"""broadcast(auto_enable=True)
**Process** - Read UDP data from the port specified by
self.acu_config, decode it, and publish to HK feeds. Full
resolution (200 Hz) data are written to feed "acu_udp_stream"
while 1 Hz decimated are written to "acu_broadcast_influx".
The 1 Hz decimated output are also stored in session.data.
Args:
auto_enable (bool): If True, the Process will try to
configure and (re-)enable the UDP stream if at any point
the stream seems to drop out.
Notes:
The session.data looks like this (this is for a SATP running
with servo details in the UDP output)::
{
"Time": 1679499948.8234625,
"Corrected_Azimuth": -20.00112176010607,
"Corrected_Elevation": 50.011521050839434,
"Corrected_Boresight": 29.998428712246067,
"Raw_Azimuth": -20.00112176010607,
"Raw_Elevation": 50.011521050839434,
"Raw_Boresight": 29.998428712246067,
"Azimuth_Current_1": -0.000384521484375,
"Azimuth_Current_2": -0.0008331298828125,
"Elevation_Current_1": 0.003397979736328125,
"Boresight_Current_1": -0.000483856201171875,
"Boresight_Current_2": -0.000105743408203125,
"Azimuth_Vel_1": -0.000002288818359375,
"Azimuth_Vel_2": 0,
"Az_Vel_Act": -0.0000011444091796875,
"Az_Vel_Des": 0,
"Az_Vffw": 0,
"Az_Pos_Des": -20.00112176010607,
"Az_Pos_Err": 0
}
"""
control = self.acu_control.streams['main']
return self._udp_stream_handler(
session, 'main', control, self.data['broadcast'],
'acu_udp_stream', 'acu_broadcast_influx',
auto_enable=params['auto_enable'],
influx_suffix='_bcast_influx')
[docs]
@ocs_agent.param('auto_enable', type=bool, default=True)
def broadcast_ext(self, session, params):
"""broadcast_ext(auto_enable=True)
**Process** - Read UDP data from the "ext" UDP stream, as
defined in self.acu_config. Like the broadcast process, this
will write full rate data to an aggregator feed and
downsampled data to an influx feed.
Args:
auto_enable (bool): If True, the Process will try to
configure and (re-)enable the UDP stream if at any point
the stream seems to drop out.
Notes:
The session.data is as you would find for the broadcast
process with a "Time" field and a bunch of readings, updated
about once per second.
"""
control = self.acu_control.streams['ext']
data_store = {}
return self._udp_stream_handler(
session, 'ext', control, data_store,
'acu_ext_stream', 'acu_ext_influx',
auto_enable=params['auto_enable'],
influx_suffix='_ext')
@inlineCallbacks
def _udp_stream_handler(self, session, stream_name, stream_control,
data_store, agg_feed, influx_feed,
auto_enable=True, influx_suffix=''):
"""Collect data from UDP (200 Hz) stream. This is a helper
function that can be used to monitor either PositionBroadcast
or PositionBroadcastExt.
Args:
session: session object for parent Process
stream_name (str): stream identifier string for log messages
stream_control: soaculib BroadcastStreamControl instance
data_store (dict): place to put the latest readings
agg_feed (str): feed name for full rate data
influx_feed (str): feed name for downsampled data
auto_enable (bool): whether to use http API to turn stream on/off
if needed.
influx_suffix (str): suffix to append to all influx fields.
"""
session.data = {}
UDP_PORT = stream_control.p['Port']
schema = stream_control.p['schema']
# For unpacking
FMT = schema['format']
FMT_LEN = struct.calcsize(FMT)
# Confirm that first two fields are the timecode.
fields = list(schema['fields'])
assert fields[:2] == ['Day', 'Time']
fields = [f.replace(' ', '_') for f in fields[2:]]
# The udp_data list is used as a queue; it contains
# struct-unpacked samples from the UDP stream in the form
# (time_received, data).
udp_data = []
class MonitorUDP(protocol.DatagramProtocol):
def datagramReceived(self, data, src_addr):
now = time.time()
host, port = src_addr
offset = 0
while len(data) - offset >= FMT_LEN:
d = struct.unpack(FMT, data[offset:offset + FMT_LEN])
udp_data.append((now, d))
offset += FMT_LEN
handler = reactor.listenUDP(int(UDP_PORT), MonitorUDP())
influx_data = {k: [] for k in ['Time'] + fields}
best_dt = None
active = True
last_packet_time = time.time()
while session.status in ['running']:
now = time.time()
if len(udp_data) >= 200:
if not active:
self.log.info(f'UDP packets are being received [{stream_name}].')
active = True
last_packet_time = now
best_dt = None
process_data = udp_data[:200]
udp_data = udp_data[200:]
for recv_time, d in process_data:
time_d, fields_d = d[:2], d[2:]
data_ctime = sh.timecode(time_d[0] + time_d[1] / sh.DAY)
if best_dt is None or abs(recv_time - data_ctime) < best_dt:
best_dt = recv_time - data_ctime
data_store['Time'] = data_ctime
influx_data['Time'].append(data_ctime)
for _f, _d in zip(fields, fields_d):
data_store[_f] = _d
influx_data[_f].append(_d)
acu_udp_stream = {'timestamp': data_store['Time'],
'block_name': 'ACU_broadcast',
'data': data_store
}
self.agent.publish_to_feed(agg_feed, acu_udp_stream)
influx_means = {}
for key, vals in influx_data.items():
influx_means[key] = np.mean(vals)
influx_data[key] = []
acu_broadcast_influx = {
'timestamp': influx_means['Time'],
'block_name': 'ACU_bcast_influx',
'data': {k + influx_suffix: v for k, v in influx_means.items()},
}
self.agent.publish_to_feed(influx_feed, acu_broadcast_influx)
session.data.update(influx_means)
else:
# Consider logging an outage, attempting reconfig.
if active and now - last_packet_time > 3:
self.log.info(f'No UDP packets are being received [{stream_name}].')
active = False
next_reconfig = time.time()
if not active and auto_enable and next_reconfig <= time.time():
self.log.info(f'Requesting UDP stream enable [{stream_name}].')
try:
cfg, raw = yield stream_control.safe_enable()
except Exception as err:
self.log.info('Exception while trying to enable stream'
'[{stream_name}]: {err}', stream_name=stream_name, err=err)
next_reconfig += 60
self._broadcast_qual = {
'timestamp': now,
'active': active,
'time_offset': best_dt,
}
yield dsleep(.01)
handler.stopListening()
return True, 'Acquisition exited cleanly.'
@inlineCallbacks
def _check_daq_streams(self, stream):
yield
session = self.agent.sessions[stream]
if session.status != 'running':
self.log.warn("Process '%s' is not running" % stream)
return False
if stream == 'broadcast':
timestamp = self.data['broadcast'].get('Time')
else:
timestamp = self.data['status']['summary'].get('ctime')
if timestamp is None:
self.log.warn('%s daq stream has no data yet.' % stream)
return False
delta = time.time() - timestamp
if delta > 2:
self.log.warn(f'{stream} daq stream has old data ({delta} seconds)')
return False
return True
def _current_azel(self):
try:
az0, el0 = [self.data['status']['summary'][f'{ax}_current_position']
for ax in ['Azimuth', 'Elevation']]
if az0 is None or el0 is None:
raise KeyError
except KeyError:
return (None, None), 'Current position could not be determined.'
return (az0, el0), f'Current (az, el) = ({az0:.4f},{el0:.4f})'
@inlineCallbacks
def _check_ready_motion(self, session):
bcast_check = yield self._check_daq_streams('broadcast')
if not bcast_check:
return False, 'Motion blocked; problem with "broadcast" data acq process.'
monitor_check = yield self._check_daq_streams('monitor')
if not monitor_check:
return False, 'Motion blocked; problem with "monitor" data acq process.'
if self.data['status']['platform_status']['Remote_mode'] == 0:
self.log.warn('ACU in local mode, cannot perform motion with OCS.')
return False, 'ACU not in remote mode.'
return True, 'Agent state ok for motion.'
@inlineCallbacks
def _set_modes(self, az=None, el=None, third=None):
"""Helper for changing individual axis modes. Respects ignore_axes.
When setting one axis it is often necessary to write others as
well. The current mode is first queried, and written back
unmodified.
"""
modes = list((yield self.acu_control.mode(size=3)))
changes = [False, False, False]
for i, (k, v) in enumerate([('az', az), ('el', el), ('third', third)]):
if k not in self.ignore_axes and v is not None:
changes[i] = True
modes[i] = v
if not any(changes):
return
if not changes[2]:
yield self.acu_control.mode(modes[:2])
else:
yield self.acu_control.mode(modes)
@inlineCallbacks
def _stop(self, all_axes=False):
"""Helper for putting all axes in Stop. This will normally just issue
acu_control.stop(); but if any axes are being "ignored", and
the user has not passed all_axes=True, then it will avoid
changing the mode of those axes.
"""
if all_axes or len(self.ignore_axes) == 0:
yield self.acu_control.stop()
return
yield self._set_modes('Stop', 'Stop', 'Stop')
def _get_limit_func(self, axis):
"""Construct a function limit(x) that will enforce that x is within
the configured limits for axis. Returns the funcion and the
tuple of limits (lower, upper).
"""
if axis == 'az':
axis = 'azimuth'
elif axis == 'el':
axis = 'elevation'
limits = self.motion_limits[axis.lower()]
limits = limits['lower'], limits['upper']
def limit_func(target):
return max(min(target, limits[1]), limits[0])
return limit_func, limits
@inlineCallbacks
def _go_to_axis(self, session, axis, target,
state_feedback=None):
"""Execute a movement, using "Preset" mode, on a specific axis.
Args:
session: session object variable of the parent operation.
axis (str): one of 'Azimuth', 'Elevation', 'Boresight'.
target (float): target position.
state_feedback (dict): place to record state (see notes).
Returns:
ok (bool): True if the motion completed successfully and
arrived at target position.
msg (str): success/error message.
Notes:
This has various checks to ensure the movement executes as
expected and in a timely fashion. In the case that the
warning horn sounds, this function should block until that
completes, even if the requested position has been achieved
(i.e. no actual motion was needed).
The state_feedback may be used to pipeline the initial parts
of the movement, so two functions aren't trying to command
at the same time. The ``state_feedback`` dict should be
passed in initialized with ``{'state': 'init'}``. When
initial commanding is finished, this function will update it
to `state="wait"`, and then on completion to `state="done"`.
"""
# Step time in event loop.
TICK_TIME = 0.1
# Time for which to sample distance for "still" and "moving"
# conditions.
PROFILE_TIME = 1.
# When aborting, how many seconds to use to project a good
# stopping position (d = v*t)
ABORT_TIME = 2.
# Threshold (deg) for declaring that we've reached
# destination.
THERE_YET = 0.01
# How long to wait after initiation for signs of motion,
# before giving up. This is normally within 2 or 3 seconds
# (SATP), but in "cold" cases where siren needs to sound, this
# can be as long as 12 seconds. For the LAT, can take an
# extra couple seconds if there were faults to clear.
MAX_STARTUP_TIME = 15.
# How long does it take to sound the warning horn? It takes
# 10 seconds. Don't wait longer than this.
WARNING_HORN_TOO_LONG = 15.
# How long after mode change to Preset should we expect to see
# brakes released, except in case that warning horn is
# sounding? 3 seconds should be enough.
WARNING_HORN_DETECT = 3.
# Velocity to assume when computing maximum time a move should
# take (to bail out in unforeseen circumstances). There are
# other checks in place to catch when the platform has not
# started moving or has stopped at the wrong place. So the
# timeout computed from this should only activate in cases
# where some other commander has taken over and then kept the
# platform moving around.
UNREASONABLE_VEL = 0.1
# Enum for the motion states
State = Enum(f'{axis}State',
['INIT', 'WAIT_MOVING', 'WAIT_STILL', 'FAIL', 'DONE'])
if state_feedback is None:
state_feedback = {}
state_feedback['state'] = 'init'
# If this axis is "ignore", skip it.
for _axis, short_name in [
('Azimuth', 'az'),
('Elevation', 'el'),
('Boresight', 'third'),
]:
if _axis == axis and short_name in self.ignore_axes:
self.log.warn('Ignoring requested motion on {axis}', axis=axis)
state_feedback['state'] = 'done'
yield dsleep(1)
return True, 'axis successfully ignored'
# Specialization for different axis types.
class AxisControl:
def get_pos(_self):
return self.data['status']['summary'][f'{axis}_current_position']
def get_mode(_self):
return self.data['status']['summary'][f'{axis}_mode']
def get_vel(_self):
return self.data['status']['summary'][f'{axis}_current_velocity']
def get_active(_self):
return bool(
self.data['status']['axis_state'][f'{axis}_brakes_released']
and not self.data['status']['axis_state'][f'{axis}_axis_stop'])
class AzAxis(AxisControl):
@inlineCallbacks
def goto(_self, target):
result = yield self.acu_control.go_to(az=target, set_mode='target')
return result
class ElAxis(AxisControl):
@inlineCallbacks
def goto(_self, target):
result = yield self.acu_control.go_to(el=target, set_mode='target')
return result
class ThirdAxis(AxisControl):
def get_vel(_self):
return 0.
@inlineCallbacks
def goto(_self, target):
result = yield self.acu_control.go_3rd_axis(target)
return result
class LatCorotator(ThirdAxis):
def get_pos(_self):
return self.data['status']['corotator']['Corotator_current_position']
def get_mode(_self):
return self.data['status']['corotator']['Corotator_mode']
def get_active(_self):
return bool(
self.data['status']['corotator']['Corotator_brakes_released']
and not self.data['status']['corotator']['Corotator_axis_stop'])
ctrl = None
if axis == 'Azimuth':
ctrl = AzAxis()
elif axis == 'Elevation':
ctrl = ElAxis()
elif axis == 'Boresight':
if self.acu_config['platform'] in ['ccat', 'lat']:
ctrl = LatCorotator()
else:
ctrl = ThirdAxis()
if ctrl is None:
return False, f"No configuration for axis={axis}"
limit_func, _ = self._get_limit_func(axis)
# History of recent distances from target.
history = []
def get_history(t):
# Returns (ok, hist) where hist is roughly the past t
# seconds of position data and ok is whether or not
# that much history was actually available.
n = int(t // TICK_TIME) + 1
return (n <= len(history)), history[-n:]
last_state = None
state = State.INIT
start_time = None
motion_aborted = False
assumption_fail = False
motion_completed = False
give_up_time = None
has_never_moved = True
warning_horn = False
while session.status in ['starting', 'running', 'stopping']:
# Time ...
now = time.time()
if start_time is None:
start_time = now
time_since_start = now - start_time
motion_expected = time_since_start > MAX_STARTUP_TIME
# Space ...
current_pos, current_vel = ctrl.get_pos(), ctrl.get_vel()
distance = abs(target - current_pos)
history.append(distance)
if give_up_time is None:
give_up_time = now + distance / UNREASONABLE_VEL \
+ MAX_STARTUP_TIME + 2 * PROFILE_TIME \
+ WARNING_HORN_TOO_LONG
# Do we seem to be moving / not moving?
ok, _d = get_history(PROFILE_TIME)
still = ok and (np.std(_d) < 0.01)
moving = ok and (np.std(_d) >= 0.01)
has_never_moved = (has_never_moved and not moving)
near_destination = distance < THERE_YET
mode_ok = (ctrl.get_mode() == 'Preset')
active_now = ctrl.get_active()
# Log only on state changes
if state != last_state:
_state = f'{axis}.state={state.name}'
self.log.info(
f'{_state:<30} dt={now - start_time:7.3f} dist={distance:8.3f}')
last_state = state
# Handle task abort
if session.status == 'stopping' and not motion_aborted:
target = limit_func(current_pos + current_vel * ABORT_TIME)
state = State.INIT
motion_aborted = True
# Turn "too long" into an immediate exit.
if now > give_up_time:
self.log.error('Motion did not complete in a timely fashion; exiting.')
assumption_fail = True
break
# Main state machine
if state == State.INIT:
# Set target position and change mode to Preset.
result = yield ctrl.goto(target)
if result in OK_RESPONSES:
state = State.WAIT_MOVING
else:
self.log.error(f'ACU rejected go_to with message: {result}')
state = State.FAIL
# Reset the clock for tracking "still" / "moving".
history = []
start_time = time.time()
elif state == State.WAIT_MOVING:
# Position and mode change requested, now wait for
# either mode change or clear failure of motion.
if mode_ok:
if active_now:
state = state.WAIT_STILL
elif time_since_start > WARNING_HORN_TOO_LONG:
self.log.error('Warning horn too long!')
state = state.FAIL
elif time_since_start > WARNING_HORN_DETECT and not warning_horn:
warning_horn = True
self.log.info('Warning horn is probably sounding.')
elif still and motion_expected:
self.log.error(f'Motion did not start within {MAX_STARTUP_TIME:.1f} s.')
state = state.FAIL
elif state == State.WAIT_STILL:
# Once moving, watch for end of motion.
state_feedback['state'] = 'wait'
if not mode_ok:
self.log.error('Unexpected axis mode transition; exiting.')
state = State.FAIL
elif still:
if near_destination:
state = State.DONE
elif has_never_moved and motion_expected:
# The settling time, near a soft limit, can be
# a bit long ... so only timeout on
# motion_expected if we've never moved at all.
self.log.error(f'Motion did not start within {MAX_STARTUP_TIME:.1f} s.')
state = State.FAIL
elif state == State.FAIL:
# Move did not complete as planned.
assumption_fail = True
break
elif state == State.DONE:
# We seem to have arrived at destination.
motion_completed = True
break
# Keep only ~20 seconds of history ...
_, history = get_history(20.)
yield dsleep(TICK_TIME)
success = motion_completed and not (motion_aborted or assumption_fail)
if success:
msg = 'Move complete.'
elif motion_aborted:
msg = 'Move aborted!'
else:
msg = 'Irregularity during motion!'
state_feedback['state'] = 'done'
return success, msg
@inlineCallbacks
def _go_to_axes(self, session, el=None, az=None, third=None,
clear_faults=False):
"""Execute a movement along multiple axes, using "Preset"
mode. This just launches _go_to_axis on each required axis,
and collects the results.
Args:
session: session object variable of the parent operation.
az (float): target for Azimuth axis (ignored if None).
el (float): target for Elevation axis (ignored if None).
third (float): target for Boresight axis (ignored if None).
clear_faults (bool): whether to clear ACU faults first.
Returns:
ok (bool): True if all motions completed successfully and
arrived at target position.
msg (str): success/error message (combined from each target
axis).
"""
# Construct args for each _go_to_axis command... don't create
# the Deferred here, because we will want to clear_faults
# first (and the Deferred might start running before that
# completes).
move_defs = []
for axis_name, short_name, target in [
('Azimuth', 'az', az),
('Elevation', 'el', el),
('Boresight', 'third', third),
]:
if target is not None:
move_defs.append(
(short_name, (session, axis_name, target)))
if len(move_defs) == 0:
return True, 'No motion requested.'
if clear_faults:
yield self.acu_control.clear_faults()
yield dsleep(1)
# Start each move, waiting for each to pass the "init" state
# before beginning the next one.
moves = []
for name, args in move_defs:
fb = {'state': 'init'}
move_def = self._go_to_axis(*args, state_feedback=fb)
while fb['state'] == 'init':
yield dsleep(.1)
moves.append(move_def)
# Now wait for all to complete.
moves = yield DeferredList(moves)
all_ok, msgs = True, []
for _ok, result in moves:
if _ok:
all_ok = all_ok and result[0]
msgs.append(result[1])
else:
all_ok = False
msgs.append(f'Crash! {result}')
if all_ok:
msg = msgs[0]
else:
msg = ' '.join([f'{name}: {msg}'
for (name, args), msg in zip(move_defs, msgs)])
return all_ok, msg
[docs]
@ocs_agent.param('az', type=float, default=None)
@ocs_agent.param('el', type=float, default=None)
@ocs_agent.param('end_stop', default=True, type=bool)
@inlineCallbacks
def go_to(self, session, params):
"""go_to(az, el, end_stop=True)
**Task** - Move the telescope to a particular point (azimuth,
elevation) in Preset mode. When motion has ended and the telescope
reaches the preset point, the function returns.
Parameters:
az (float): destination angle for the azimuth axis
el (float): destination angle for the elevation axis
end_stop (bool): put the commanded axes in Stop mode at
the end of the motion
Notes:
If az or el is unspecified (None), the axis will not be
commanded to a new position and will not be put in Preset
mode, and will not be put in Stop (if end_stop) after motion.
When omitting el, and if Sun Avoidance path-finding
decides an elevation change is required to travel from the
current position to the implicit target position, the
task will exit with error.
"""
with self.azel_lock.acquire_timeout(0, job='go_to') as acquired:
if not acquired:
return False, f"Operation failed: {self.azel_lock.job} is running."
if self._get_sun_policy('motion_blocked'):
return False, "Motion blocked; Sun avoidance in progress."
self.log.info('Clearing faults to prepare for motion.')
yield self.acu_control.clear_faults()
yield dsleep(1)
ok, msg = yield self._check_ready_motion(session)
if not ok:
return False, msg
targets = {k: params[k] for k in ['az', 'el']}
def axis_filter_args(az, el):
return {k: v for k, v in [('az', az), ('el', el)]
if targets[k] is not None}
for axis, target in targets.items():
limit_func, limits = self._get_limit_func(axis)
if target is not None and target != limit_func(target):
raise ocs_agent.ParamError(
f'{axis}={target} not in accepted range, '
f'[{limits[0]}, {limits[1]}].')
self.log.info('Requested position: ' + ', '.join(
[f'{axis}={target}' for axis, target in targets.items()]))
legs, msg = yield self._get_sunsafe_moves(targets['az'], targets['el'])
if msg is not None:
self.log.error(msg)
return False, msg
if len(legs) > 2:
if None in targets.values():
return False, "Sun-safe path requires multiple moves, but simple path requested."
self.log.info(f'Executing move via {len(legs) - 1} separate legs (sun optimized)')
# Check HWP safety
hwp_safe, msg = yield self._check_hwpsafe_legs(legs)
if not hwp_safe:
self.log.info('{msg}', msg=msg)
return False, msg
for leg_az, leg_el in legs[1:]:
all_ok, msg = yield self._go_to_axes(session, **axis_filter_args(leg_az, leg_el))
if not all_ok:
break
if all_ok and params['end_stop']:
yield self._set_modes(**axis_filter_args('Stop', 'Stop'))
return all_ok, msg
[docs]
@ocs_agent.param('target', type=float)
@ocs_agent.param('end_stop', default=True, type=bool)
@inlineCallbacks
def set_boresight(self, session, params):
"""set_boresight(target, end_stop=True)
**Task** - Move the telescope to a particular third-axis angle.
Parameters:
target (float): destination angle for boresight rotation
end_stop (bool): put axis in Stop mode after motion
"""
with self.boresight_lock.acquire_timeout(0, job='set_boresight') as acquired:
if not acquired:
return False, f"Operation failed: {self.boresight_lock.job} is running."
hwp_ok, msg = self._check_hwpsafe_here(['third'])
if not hwp_ok:
self.log.info('{msg}', msg=msg)
return False, f"Motion not HWP-safe: {msg}"
self.log.info('Clearing faults to prepare for motion.')
yield self.acu_control.clear_faults()
yield dsleep(1)
ok, msg = yield self._check_ready_motion(session)
if not ok:
return False, msg
target = params['target']
for axis, target in {'boresight': target}.items():
limit_func, limits = self._get_limit_func(axis)
if target != limit_func(target):
raise ocs_agent.ParamError(
f'{axis}={target} not in accepted range, '
f'[{limits[0]}, {limits[1]}].')
self.log.info(f'Commanded position: boresight={target}')
ok, msg = yield self._go_to_axis(session, 'Boresight', target)
if ok and params['end_stop']:
yield self._set_modes(third='Stop')
return ok, msg
[docs]
@ocs_agent.param('target')
@ocs_agent.param('end_stop', default=True, type=bool)
@inlineCallbacks
def go_to_named(self, session, params):
"""go_to_named(target, end_stop=True)
**Task** - Move the telescope to a named position,
e.g. "home", that has been configured through command line args.
Parameters:
target (str): name of the target position.
end_stop (bool): put axes in Stop mode after motion
"""
target = self.named_positions.get(params['target'])
if target is None:
return False, 'Position "%s" is not configured.' % params['target']
ok, msg, _session = self.agent.start('go_to', {'az': target[0], 'el': target[1],
'end_stop': params['end_stop']})
if ok == ocs.ERROR:
return False, 'Failed to start go_to task.'
ok, msg, _session = yield self.agent.wait('go_to')
return (ok == ocs.OK), msg
[docs]
@ocs_agent.param('speed_mode', choices=['high', 'low'])
@inlineCallbacks
def set_speed_mode(self, session, params):
"""set_speed_mode(speed_mode)
**Task** - Set the ACU Speed Mode. This affects motion when
in Preset mode, such as when using go_to in this Agent. It
should not affect the speed of scans done in ProgramTrack
mode.
Parameters:
speed_mode (str): 'high' or 'low'.
Notes:
The axes must be in Stop mode for this to work. This task
will return an error if the command appears to have failed.
The actual speed and acceleration settings for the "high"
and "low" (perhaps called "aux") settings must be configured
on the ACU front panel.
"""
http = aculib.streams.ModularHttpInterface(
self.acu_config['dev_url'], backend=TwistedHttpBackend())
data = 'Command=Set Speed ' + params['speed_mode'].capitalize()
resp_bytes = yield http.Post(data, 'DataSets.CmdGeneralTransfer', '3')
resp = resp_bytes.decode('utf8')
if '<p>Status: executed</p>' in resp:
return True, "Speed mode changed."
elif '<p>Status: not allowed</p>' in resp:
return False, "Mode change blocked (are you in Stop?)"
else:
return False, "Response was not as expected."
[docs]
@ocs_agent.param('az_speed', type=float, default=None)
@ocs_agent.param('az_accel', type=float, default=None)
@ocs_agent.param('el_freq', type=float, default=None)
@ocs_agent.param('el_mode', choices=['stop', 'preset', 'programtrack', ''],
default=None)
@ocs_agent.param('turnaround_method', type=str, default=None,
choices=[None, 'standard', 'standard_gen',
'three_leg', 'two_leg'])
@ocs_agent.param('reset', default=False, type=bool)
@inlineCallbacks
def set_scan_params(self, session, params):
"""set_scan_params(az_speed=None, az_accel=None, reset=False))
**Task** - Update the default scan parameters, used by
generate_scan if not passed explicitly.
Parameters:
az_speed (float, optional): The azimuth scan speed.
az_accel (float, optional): The (average) azimuth
acceleration at turn-around.
el_freq (float, optional): The frequency of elevation nods in
type 3 scans.
el_mode (str, optional): If not null, the elevation axis
will be put in this mode after the initial position seek
(but before scan begins). This can be used to do type 1/2
scans with el axis held in Stop mode. The special value
of '' will revert el_mode to the default (None).
reset (bool, optional): If True, reset all params to default
values before applying any updates passed explicitly here.
"""
if params['reset']:
self.scan_params.update(self.default_scan_params)
for k in ['az_speed', 'az_accel', 'el_freq', 'turnaround_method', 'el_mode']:
if params[k] is not None:
self.scan_params[k] = params[k]
if params['el_mode'] == '':
self.scan_params['el_mode'] = None
self.log.info('Updated default scan params to {sp}', sp=self.scan_params)
yield
return True, 'Done'
[docs]
@ocs_agent.param('_')
@inlineCallbacks
def clear_faults(self, session, params):
"""clear_faults()
**Task** - Clear any axis faults.
"""
yield self.acu_control.clear_faults()
session.set_status('stopping')
return True, 'Job completed.'
[docs]
@ocs_agent.param('all_axes', default=False, type=bool)
@inlineCallbacks
def stop_and_clear(self, session, params):
"""stop_and_clear(all_axes=False)
**Task** - Change the azimuth, elevation, and 3rd axis modes
to Stop; also clear the ProgramTrack stack.
Args:
all_axes (bool): Send Stop to all axes, even ones user has
requested to be ignored.
"""
def _read_modes():
modes = [self.data['status']['summary']['Azimuth_mode'],
self.data['status']['summary']['Elevation_mode']]
if self.acu_config['platform'] == 'satp':
modes.append(self.data['status']['summary']['Boresight_mode'])
elif self.acu_config['platform'] in ['ccat', 'lat']:
modes.append(self.data['status']['corotator']['Corotator_mode'])
return modes
for i in range(6):
for short_name, mode in zip(['az', 'el', 'third'],
_read_modes()):
if (params['all_axes'] or short_name not in self.ignore_axes) and mode != 'Stop':
break
else:
self.log.info('All axes in Stop mode')
break
yield self._stop(params['all_axes'])
self.log.info('Stop called (iteration %i)' % (i + 1))
yield dsleep(0.1)
else:
msg = 'Failed to set all axes to Stop mode!'
self.log.error(msg)
return False, msg
for i in range(6):
free_stack = self.data['status']['summary']['Free_upload_positions']
if free_stack < FULL_STACK:
yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer',
'Clear Stack')
self.log.info('Clear Stack called (iteration %i)' % (i + 1))
yield dsleep(0.1)
else:
self.log.info('Stack cleared')
break
else:
msg = 'Failed to clear the ProgramTrack stack!'
self.log.warn(msg)
return False, msg
session.set_status('stopping')
return True, 'Job completed'
[docs]
@ocs_agent.param('action', choices=['unstow', 'elsync'])
@ocs_agent.param('force', type=bool, default=False)
@ocs_agent.param('elsync_ref', type=float, default=None)
@inlineCallbacks
def special_action(self, session, params):
"""special_action(action, force=False, elsync_ref=None)
**Task** - Perform a special action or set a special mode.
Args:
action (str): Action to perform. See notes.
force (bool): Perform the action even if conditions suggest
it need not or should not be run.
elsync_ref (float): For action='elsync', sets the reference
elevation for the locked co-rotator mode. (This is the
negative of the ACU offset parameter.)
Notes:
- 'unstow': Set the el and az axis modes to "UnStow". This
is used to recover the LAT from "maintenance stow"
position, where el=-90 and pins inserted. The Task
returns after setting the mode; transition to Stop will
normally occur after a few seconds.
- 'elsync': Put the LAT corotator into ElSync mode. If
elsync_ref is provided, that is sent to the ACU
first. Otherwise the offset is left unchanged. Unless
force=True, the corotator axis should be in Stop before
when this is called.
"""
if params['action'] == 'unstow':
if not params['force']:
el_mode = self.data['status']['summary']['Elevation_mode']
if el_mode.lower() not in ['stow', 'maintenancestow']:
return False, f"Not unstowing because elevation mode is {el_mode}; "\
"override with force=True."
response = yield self._set_modes(az='Stop', el='UnStow')
self.log.info('response to UnStow: {response}', response=response)
yield dsleep(0.5)
elif params['action'] == 'elsync':
if not params['force']:
third_mode = self.data['status']['corotator']['Corotator_mode']
if third_mode.lower() != 'stop':
return False, f"Not going to elsync mode because corotator mode is {third_mode}; "\
"override with force=True."
if params['elsync_ref'] is not None:
response = yield self.acu_control.http.Command(
'DataSets.Corotator', 'SetOffsetToElevation', '%.6f' % (-params['elsync_ref']))
self.log.info('response to set elsync_ref : {response}', response=response)
yield dsleep(0.5)
response = yield self._set_modes(third='ElSync')
self.log.info('response to set ElSync mode: {response}', response=response)
yield dsleep(0.5)
else:
return False, f"Unimplemented action '{params['action']}'."
return True, 'Done.'
[docs]
@ocs_agent.param('filename', type=str)
@ocs_agent.param('absolute_times', type=bool, default=False)
@ocs_agent.param('azonly', type=bool, default=True)
@inlineCallbacks
def fromfile_scan(self, session, params=None):
"""fromfile_scan(filename, absolute_times=False, azonly=True)
**Process** - Upload and execute a scan pattern from a file.
Parameters:
filename (str): full path to the track file.
absolute_times (bool): If True, the track timestamps are
taken at face value. Otherwise, the timestamps are
treated as relative to the track start time, which
will be a few seconds in the future from when this
function is called.
azonly (bool): If True, the elevation part of the track
will be uploaded but the el axis won't be put in
ProgramTrack mode. It might be put in Stop mode
though.
Notes:
See :func:`drivers.from_file
<socs.agents.acu.drivers.from_file>` for discussion of the
file structure.
"""
ff_scan = sh.from_file(params['filename'])
if ff_scan.az_range[0] <= self.motion_limits['azimuth']['lower'] \
or ff_scan.az_range[1] >= self.motion_limits['azimuth']['upper']:
return False, 'Azimuth location out of range!'
if ff_scan.el_range[0] <= self.motion_limits['elevation']['lower'] \
or ff_scan.el_range[1] >= self.motion_limits['elevation']['upper']:
return False, 'Elevation location out of range!'
# Modify times?
t_shift = 0
if not params['absolute_times']:
t_shift = time.time() + 5.
# Turn those lines into a generator.
def line_batcher(ff_scan, t_shift=0., n=10):
lines = [sh.track_point_time_shift(p, t_shift)
for p in ff_scan.points]
while True:
while len(lines):
some, lines = lines[:n], lines[n:]
yield some
if ff_scan.loop_time <= 0:
break
t_shift += ff_scan.loop_time
lines = [sh.track_point_time_shift(p, t_shift)
for p in ff_scan.points[ff_scan.preamble_count:]]
point_gen = line_batcher(ff_scan, t_shift)
if params['azonly']:
track_axes = ['az']
else:
track_axes = ['az', 'el']
ok, err = yield self._run_track(
session,
point_gen,
step_time=ff_scan.step_time,
free_form=ff_scan.free_form,
track_axes=track_axes)
return ok, err
[docs]
@ocs_agent.param('az_endpoint1', type=float)
@ocs_agent.param('az_endpoint2', type=float)
@ocs_agent.param('az_speed', type=float, default=None)
@ocs_agent.param('az_accel', type=float, default=None)
@ocs_agent.param('el_endpoint1', type=float, default=None)
@ocs_agent.param('el_endpoint2', type=float, default=None)
@ocs_agent.param('el_speed', type=float, default=0.)
@ocs_agent.param('el_freq', type=float, default=None)
@ocs_agent.param('el_mode', choices=['stop', 'preset', 'programtrack'],
default=None)
@ocs_agent.param('num_scans', type=float, default=None)
@ocs_agent.param('start_time', type=float, default=None)
@ocs_agent.param('wait_to_start', type=float, default=None)
@ocs_agent.param('step_time', type=float, default=None)
@ocs_agent.param('az_start', default='end',
choices=['end', 'mid', 'az_endpoint1', 'az_endpoint2',
'mid_inc', 'mid_dec'])
@ocs_agent.param('az_drift', type=float, default=None)
@ocs_agent.param('scan_type', default=1, choices=[1, 2, 3])
@ocs_agent.param('az_vel_ref', type=float, default=None)
@ocs_agent.param('turnaround_method', default=None,
choices=[None, 'standard', 'standard_gen',
'three_leg', 'two_leg'])
@ocs_agent.param('scan_upload_length', type=float, default=None)
@ocs_agent.param('type', default=None, choices=[1, 2, 3])
@inlineCallbacks
def generate_scan(self, session, params):
"""generate_scan(az_endpoint1, az_endpoint2, \
az_speed=None, az_accel=None, \
el_endpoint1=None, el_endpoint2=None, \
el_speed=None, el_freq=None, \
el_mode=None, \
num_scans=None, start_time=None, \
wait_to_start=None, step_time=None, \
az_start='end', az_drift=None, \
scan_type=1, az_vel_ref=None, \
turnaround_method=None, \
scan_upload_length=None)
**Process** - Scan generator, currently only works for
constant-velocity az scans with fixed elevation.
Parameters:
az_endpoint1 (float): first endpoint of a linear azimuth scan
az_endpoint2 (float): second endpoint of a linear azimuth scan
az_speed (float): azimuth speed for constant-velocity scan
az_accel (float): turnaround acceleration for a constant-velocity scan
el_endpoint1 (float): first endpoint of elevation motion.
In the present implementation, this will be the
constant elevation declared at every point in the
track.
el_endpoint2 (float): this is ignored.
el_speed (float): this is ignored.
el_freq (float): frequency of the elevation nods for
scan_type=3.
el_mode (str): By default, the elevation axis mode for
type 1 and 2 scans will be left in Preset after the
initial move. To force it instead into Stop mode,
pass "stop" (case-sensitive) here. ("preset" and
"programtrack" are also accepted, and will result in
that mode being set prior to launching the track.)
num_scans (int or None): if not None, limits the scan to
the specified number of constant velocity legs. The
process will exit without error once that has
completed.
start_time (float or None): a unix timestamp giving the
time at which the scan should begin. The default is
None, which means the scan will start immediately (but
taking into account the value of wait_to_start).
wait_to_start (float): number of seconds to wait before
starting a scan, in the case that start_time is None.
The default is to compute a minimum time based on the
scan parameters and the ACU ramp-up algorithm; this is
typically 5-10 seconds.
step_time (float): time, in seconds, between points on the
constant-velocity parts of the motion. The default is
None, which will cause an appropriate value to be
chosen automatically (typically 0.1 to 1.0).
az_start (str): part of the scan to start at. To start at one
of the extremes, use 'az_endpoint1', 'az_endpoint2', or
'end' (same as 'az_endpoint1'). To start in the midpoint
of the scan use 'mid_inc' (for first half-leg to have
positive az velocity), 'mid_dec' (negative az velocity),
or 'mid' (velocity oriented towards endpoint2).
az_drift (float): if set, this should be a drift velocity
in deg/s. The scan extrema will move accordingly. This
can be used to better follow compact sources as they
rise or set through the focal plane.
scan_type (int): What type of scan to use. Only 1, 2, 3 are valid.
Type 1 is a constant elevation scan.
Type 2 includes a variation in az speed that scales as sin(az).
Type 3 is a Type 2 with an sinusoidal el nod.
az_vel_ref (float or None): azimuth to center the velocity profile at.
If None then the average of the endpoints is used.
turnaround_method (str): The method used for generating turnaround.
Default (None) generates the baseline minimal jerk trajectory.
'standard' uses the acu standard turnaround generation (same as None).
'standard_gen' generates a track_point list of points that mimics
the acu standard turnaround generation for use in type2/type3 scans.
'three_leg' generates a three-leg turnaround which attempts to
minimize the acceleration at the midpoint of the turnaround.
'two_leg' generates a three-leg turnaround with second_leg_time = 0.
scan_upload_length (float): number of seconds for each set
of uploaded points. If this is not specified, the
track manager will try to use as short a time as is
reasonable.
type (int): Temporary alias for scan_type. Do not
use. Will be removed.
Notes:
Note that all parameters are optional except for
az_endpoint1 and az_endpoint2. If only those two parameters
are passed, the Process will scan between those endpoints,
with the elevation axis held in Stop, indefinitely (until
Process .stop method is called)..
"""
init_time = time.time() # for params feed.
if self._get_sun_policy('motion_blocked'):
return False, "Motion blocked; Sun avoidance in progress."
if params['type'] is not None:
self.log.warn('Caller passed "type" instead of "scan_type" arg; moving.')
params['scan_type'] = params['type']
del params['type']
self.log.info('User scan params: {params}', params=params)
az_endpoint1 = params['az_endpoint1']
az_endpoint2 = params['az_endpoint2']
el_endpoint1 = params['el_endpoint1']
el_endpoint2 = params['el_endpoint2']
az_vel_ref = params['az_vel_ref']
# Params with defaults configured ...
az_speed = params['az_speed']
az_accel = params['az_accel']
el_freq = params['el_freq']
turnaround_method = params['turnaround_method']
el_mode = params['el_mode']
if az_speed is None:
az_speed = self.scan_params['az_speed']
if az_accel is None:
az_accel = self.scan_params['az_accel']
if el_freq is None:
el_freq = self.scan_params['el_freq']
if turnaround_method is None:
turnaround_method = self.scan_params['turnaround_method']
if params['scan_type'] in [2, 3] and turnaround_method == 'standard':
turnaround_method = 'standard_gen'
self.log.info('Setting turnaround_method="standard_gen" for type2/3 scan.')
if el_mode is None:
el_mode = self.scan_params['el_mode'] # ... which may also be None.
# Check if the turnaround method is usable for the called scan type.
# This should never happen with the above turnaround_method setting.
if turnaround_method == "standard" and params['scan_type'] != 1:
raise ValueError("Cannot use standard turnaround method with type 2 or 3 scans!")
# Do we need to limit the az_accel? This limit comes from a
# maximum jerk parameter; the equation below (without the
# empirical 0.85 adjustment) is stated in the SATP ACU ICD.
min_turnaround_time = (0.85 * az_speed / 9 * 11.616)**.5
max_turnaround_accel = 2 * az_speed / min_turnaround_time
# You must also not exceed the platform max accel.
if self.motion_limits['azimuth'].get('accel'):
max_turnaround_accel = min(
max_turnaround_accel,
self.motion_limits['azimuth'].get('accel') / 1.88)
if az_accel > max_turnaround_accel:
self.log.warn('WARNING: user requested accel=%.2f; limiting to %.2f' %
(az_accel, max_turnaround_accel))
az_accel = max_turnaround_accel
# If el is not specified, drop in the current elevation.
if el_endpoint1 is None:
el_endpoint1 = self.data['status']['summary']['Elevation_current_position']
if el_endpoint2 is None:
el_endpoint2 = el_endpoint1
# If requested el is just outside acceptable range, tweak it in.
_f, _ = self._get_limit_func('elevation')
el_endpoint1, _untweaked_el = _f(el_endpoint1), el_endpoint1
if abs(el_endpoint1 - _untweaked_el) > 0.1:
return False, "Current elevation (%.4f) is well outside limits." % _untweaked_el
init_el = el_endpoint1
scan_upload_len = params.get('scan_upload_length')
scan_params = {k: params.get(k) for k in [
'num_scans', 'num_batches', 'start_time',
'wait_to_start', 'step_time', 'batch_size',
'az_start', 'az_drift']
if params.get(k) is not None}
if params['scan_type'] in [2, 3]:
scan_params["az_start"] = "mid_dec"
el_speed = params.get('el_speed', 0.0)
az_edge_speed = az_speed
if params['scan_type'] in [2, 3]:
if az_vel_ref is None:
az_vel_ref = (az_endpoint1 + az_endpoint2) / 2.
az_cent = az_vel_ref - 90
az_edge = np.max(np.abs((az_endpoint1 - az_cent, az_endpoint2 - az_cent)))
az_edge_speed = az_speed / np.sin(az_edge)
plan = sh.plan_scan(az_endpoint1, az_endpoint2,
el=el_endpoint1, v_az=az_edge_speed, a_az=az_accel,
az_start=scan_params.get('az_start'),
scan_type=params['scan_type'])
# Use the plan to set scan upload parameters.
if scan_params.get('step_time') is None:
scan_params['step_time'] = plan['step_time']
if scan_params.get('wait_to_start') is None:
scan_params['wait_to_start'] = plan['wait_to_start']
step_time = scan_params['step_time']
point_batch_count = None
if scan_upload_len:
point_batch_count = scan_upload_len / step_time
self.log.info('The plan: {plan}', plan=plan)
self.log.info('The scan_params: {scan_params}', scan_params=scan_params)
# Before any motion, check for sun safety.
ok, msg = self._check_scan_sunsafe(az_endpoint1, az_endpoint2, el_endpoint1,
az_speed, az_accel)
if ok:
self.log.info('Sun safety check: {msg}', msg=msg)
else:
self.log.error('Sun safety check fails: {msg}', msg=msg)
return False, 'Scan is not Sun Safe.'
# Clear faults.
self.log.info('Clearing faults to prepare for motion.')
yield self.acu_control.clear_faults()
yield dsleep(1)
# Verify we're good to move
ok, msg = yield self._check_ready_motion(session)
if not ok:
return False, msg
# Seek to starting position. Note "legs" will always include
# at least 2 points; first point being current (az, el).
self.log.info(f'Moving to start position, az={plan["init_az"]}, el={init_el}')
legs, msg = yield self._get_sunsafe_moves(plan['init_az'], init_el)
if msg is not None:
self.log.error(msg)
return False, msg
hwp_safe, msg = yield self._check_hwpsafe_legs(legs)
if not hwp_safe:
msg = f'Move to start position not permitted: {msg}'
self.log.info('{msg}', msg=msg)
return False, msg
# Also validate the scan generally -- need to be movable in az.
hwp_safe, msg = yield self._check_hwpsafe(init_el, init_el, axes=['az'])
if not hwp_safe:
msg = f'Const-el scan not permitted: {msg}'
self.log.info(msg)
return False, msg
for leg_az, leg_el in legs[1:]:
ok, msg = yield self._go_to_axes(session, az=leg_az, el=leg_el)
if not ok:
return False, f'Start position seek failed with message: {msg}'
# Force elevation axis to stop mode?
if el_mode:
for k in ['Stop', 'Preset', 'ProgramTrack']:
if el_mode.lower() == k.lower():
yield self._set_modes(el=k)
break
else:
return False, f'User requested invalid el_mode={el_mode}'
# Prepare the point generator.
free_form = False
if params['scan_type'] == 1:
track_axes = ['az']
if turnaround_method != 'standard':
free_form = True
g = sh.generate_constant_velocity_scan(az_endpoint1=az_endpoint1,
az_endpoint2=az_endpoint2,
az_speed=az_speed, acc=az_accel,
turnaround_method=turnaround_method,
el_endpoint1=el_endpoint1,
el_endpoint2=el_endpoint2,
el_speed=el_speed,
az_first_pos=plan['init_az'],
**scan_params)
elif params['scan_type'] == 2:
free_form = True
track_axes = ['az']
g = sh.generate_type2_scan(az_endpoint1=az_endpoint1,
az_endpoint2=az_endpoint2,
az_speed=az_speed, acc=az_accel,
turnaround_method=turnaround_method,
el_endpoint1=el_endpoint1,
az_vel_ref=az_vel_ref,
az_first_pos=plan['init_az'],
**scan_params)
elif params['scan_type'] == 3:
free_form = True
track_axes = ['az', 'el']
g = sh.generate_type3_scan(az_endpoint1=az_endpoint1,
az_endpoint2=az_endpoint2,
az_speed=az_speed, acc=az_accel,
turnaround_method=turnaround_method,
el_endpoint1=el_endpoint1,
el_endpoint2=el_endpoint2,
el_freq=el_freq,
az_vel_ref=az_vel_ref,
az_first_pos=plan['init_az'],
**scan_params)
else:
raise ValueError("Scan type must be 1, 2, or 3")
scan_params_bundle = {'session_id': session.session_id,
'schema': 1,
'event': 1,
'init_time': init_time,
}
scan_params_bundle.update({
'az1': az_endpoint1,
'az2': az_endpoint2,
'az_vel': az_speed,
'az_accel': az_accel,
'el1': el_endpoint1,
'el2': el_endpoint2,
'el_freq': el_freq,
'type': params['scan_type'],
'turnaround_type': sh.TURNAROUNDS_ENUM[turnaround_method],
'track_axes': ','.join(track_axes),
})
self.agent.publish_to_feed('scan_params',
{'timestamp': time.time(),
'block_name': 'info',
'data': scan_params_bundle})
ret_val = (yield self._run_track(
session=session, point_gen=g, step_time=step_time,
track_axes=track_axes, point_batch_count=point_batch_count,
free_form=free_form, unabort_failure=(params['scan_type'] in [2, 3])))
self.agent.publish_to_feed('scan_params',
{'timestamp': time.time(),
'block_name': 'exit',
'data': {'session_id': session.session_id,
'event': 2}})
return ret_val
@inlineCallbacks
def _run_track(self, session, point_gen, step_time, track_axes=['az'],
point_batch_count=None, free_form=False, unabort_failure=False):
"""Run a ProgramTrack track scan, with points provided by a
generator.
Args:
session: session object for the parent operation.
point_gen: generator that yields points
step_time: the minimum time between point track points.
This is used to guarantee that points are uploaded
sufficiently in advance for the servo unit to process
them.
track_axes: list of strings indicating which axes ('az',
'el') should be put in ProgramTrack mode. Axes not
included here will not have their mode changed.
point_batch_count: number of points to include in batch
uploads. This parameter can be used to increase the value
beyond the minimum set internally based on step_time.
free_form: if True, disable ACU linear interpolation and
turn-around profiling.
unabort_failure: if True don't fail on a bad exit.
Returns:
Tuple (success, msg) where success is a bool.
"""
# The approximate loop time
LOOP_STEP = 0.1 # seconds
# Time to allow for initial ProgramTrack transition.
MAX_PROGTRACK_SET_TIME = 5.
# Minimum number of points to have in the stack. While the
# docs strictly require 4, this number should be at least 1
# more than that to allow for rounding when we are setting the
# refill threshold.
MIN_STACK_POP = 6 # points
# Minimum amount of time (seconds), in advance, to populate
# the trajectory. In cases where step_time is short, this
# creates a longer track window to survive agent outages.
# (The cost is that stopping a scan may take a little longer.)
MIN_STACK_ADVANCE_TIME = 3.
# Minimum nuber of points to keep in the stack.
_pbc = max(MIN_STACK_POP, MIN_STACK_ADVANCE_TIME / step_time)
if point_batch_count is None or _pbc > point_batch_count:
point_batch_count = _pbc
STACK_REFILL_THRESHOLD = \
FULL_STACK - point_batch_count - LOOP_STEP / step_time
STACK_TARGET = \
FULL_STACK - point_batch_count * 2 - LOOP_STEP / step_time
# Special error bits to watch here
PTRACK_FAULT_KEYS = [
'ProgramTrack_position_failure',
'Track_start_too_early',
'Turnaround_accel_too_high',
'Turnaround_time_too_short',
]
if free_form:
init_cmds = [
('Clear Stack', 0.),
('Set Profiler Off', 0.),
('Set Interpolation Spline', 0.5)
]
else:
init_cmds = [
('Clear Stack', 0.),
('Set Profiler On', 0.),
('Set Interpolation Linear', 0.5)
]
with self.azel_lock.acquire_timeout(0, job='generate_scan') as acquired:
if not acquired:
return False, f"Operation failed: {self.azel_lock.job} is running."
if session.status not in ['starting', 'running']:
return False, "Operation aborted before motion began."
for _c, _d in init_cmds:
resp = yield self.acu_control.http.Command(
'DataSets.CmdTimePositionTransfer', _c)
if resp != b'OK, Command executed.':
return False, f"Failed to init: {_c}"
if _d > 0:
yield dsleep(_d)
if track_axes is not None and len(track_axes) > 0:
assert ([_ax in ['az', 'el'] for _ax in track_axes])
mode_args = {_ax: 'ProgramTrack' for _ax in track_axes}
yield self._set_modes(**mode_args)
yield dsleep(0.1)
# Values for mode are:
# - 'go' -- keep uploading points (unless there are no more to upload).
# - 'stop' -- do not request more points from generator;
# finish the ones that are already in "points", let the stack empty,
# and wait for settling condition.
# - 'abort' -- do not upload more points; exit loop with error; wait
# a few seconds and clear the stack.
mode = 'go'
lines = []
last_mode = None
last_upload_az = None
start_time = time.time()
got_progtrack = False
faults = {}
got_points_in = False
first_upload_time = None
wait_stop_timeout = None
prog_track_err = False
stop_message = ""
while True:
now = time.time()
current_modes = {'Az': self.data['status']['summary']['Azimuth_mode'],
'El': self.data['status']['summary']['Elevation_mode'],
'Remote': self.data['status']['platform_status']['Remote_mode']}
az_state = {'pos': self.data['status']['summary']['Azimuth_current_position'],
'vel': self.data['status']['summary']['Azimuth_current_velocity']}
free_positions = self.data['status']['summary']['Free_upload_positions']
# Use this var to detect case where we're uploading
# points but ACU is quietly dumping them because the
# vel is too high.
got_points_in = got_points_in \
or (got_progtrack and free_positions < FULL_STACK)
if last_mode != mode:
self.log.info(f'scan mode={mode}, line_buffer={len(lines)}, track_free={free_positions}')
last_mode = mode
for k in PTRACK_FAULT_KEYS:
if k not in faults and self.data['status']['ACU_failures_errors'].get(k):
self.log.info('Fault during track: "{k}"', k=k)
faults[k] = True
if mode != 'abort':
# Reasons we might decide to abort ...
if current_modes['Az'] == 'ProgramTrack':
got_progtrack = True
else:
if got_progtrack:
self.log.warn('Unexpected exit from ProgramTrack mode!')
if mode == 'stop':
prog_track_err = True
mode = 'abort'
elif now - start_time > MAX_PROGTRACK_SET_TIME:
self.log.warn('Failed to set ProgramTrack mode in a timely fashion.')
mode = 'abort'
if not got_points_in and (first_upload_time is not None) \
and (now - first_upload_time > 10):
self.log.warn('ACU seems to be dumping our track. Vel too high?')
mode = 'abort'
if current_modes['Remote'] == 0:
self.log.warn('ACU no longer in remote mode!')
mode = 'abort'
if session.status == 'stopping' and mode not in ['stop', 'abort']:
mode = 'stop'
stop_message = 'User-requested stop.'
lines = []
if mode == 'abort':
lines = []
# Is it time to upload more lines?
if free_positions >= STACK_REFILL_THRESHOLD:
new_line_target = max(int(free_positions - STACK_TARGET), 1)
# Make sure that you get one more line than you
# need (len(lines) > new_line_target), so that
# after "grabbing the minimum batch", below, there
# is still >= 1 line left. The lines-is-empty
# check is used to decide we're done.
while mode == 'go' and (len(lines) <= new_line_target or lines[-1].group_flag != 0):
try:
lines.extend(next(point_gen))
except StopIteration:
mode = 'stop'
stop_message = 'Stop due to end of the planned track.'
if len(lines) > FULL_STACK / 2:
# This is used to raise an error and
# abort; but it is more likely to occur in
# the case where bad group_flag handling,
# above, is messing things up. So we just
# break out and let some points get
# processed.
break
# Grab the minimum batch
upload_lines, lines = lines[:new_line_target], lines[new_line_target:]
# If the last line has a "group" flag, keep transferring lines.
while len(lines) and len(upload_lines) and upload_lines[-1].group_flag != 0:
upload_lines.append(lines.pop(0))
if len(upload_lines):
# Discard the group flag and upload all.
text = sh.get_track_points_text(
upload_lines, timestamp_offset=3, text_block=True)
for attempt in range(5):
_dt = time.time()
try:
# This seems to return b'Ok.' no matter ~what,
# so not much point checking it.
yield self.acu_control.http.UploadPtStack(text)
break
except Exception as err:
_dt = time.time() - _dt
self.log.warn(f'Upload {len(upload_lines)} failed (attempt {attempt}) after {_dt:.3f} seconds')
self.log.warn('Exception was: {err}', err=err)
else:
raise RuntimeError('Upload fail.')
if first_upload_time is None:
first_upload_time = time.time()
last_upload_az = upload_lines[-1].az
if len(lines) == 0 and free_positions >= FULL_STACK - 1:
if mode == 'stop':
if wait_stop_timeout is None:
self.log.info('Stack is empty; waiting for settling...')
wait_stop_timeout = now + 20.
elif now > wait_stop_timeout:
self.log.warn('Graceful stop condition not met in a timely fashion.')
mode = 'abort'
# Await safe exit condition.
pos_ok = last_upload_az is None or (
abs(az_state['pos'] - last_upload_az) < 0.01)
vel_ok = abs(abs(az_state['vel']) < .01)
if pos_ok and vel_ok:
break
else:
self.log.warn('Somehow ran out of points!')
break
yield dsleep(LOOP_STEP)
# Go to Stop mode?
# yield self.acu_control.stop()
# Wait a couple more seconds and clear the stack.
yield dsleep(2)
yield self.acu_control.http.Command('DataSets.CmdTimePositionTransfer',
'Clear Stack')
if mode == 'abort':
if unabort_failure and prog_track_err:
return True, 'Problems on shutdown but close enough.'
return False, 'Problems during scan'
return True, f'Scan ended. {stop_message}'
#
# Sun Safety Monitoring and Active Avoidance
#
def _reset_sun_params(self):
"""Resets self.sun_params based on the instance defaults, and
motion_limits. This must be called at least once, on startup,
to set up Sun monitoring and avoidance properly.
"""
# Set up sun_params data structure.
_p = {
# Global enable (but see "disable_until").
'active_avoidance': False,
# Can be set to a timestamp, in which case Sun Avoidance
# is disabled until that time has passed.
'disable_until': 0,
# Flag for indicating normal motions should be blocked
# (Sun Escape is active).
'block_motion': False,
# Flag for update_sun to indicate Sun map needs recomputed
'recompute_req': False,
# If set, should be a timestamp at which escape_sun_now
# will be initiated.
'next_drill': None,
# Parameters for the Sun Safety Map computation.
'safety_map_kw': {
'sun_time_shift': 0,
},
# Avoidance policy, for use in avoidance decisions.
'policy': {},
}
# Active avoidance?
_p['active_avoidance'] = self.sun_config['enabled']
# Avoidance requires platform limits and move policies
_p['policy'].update({
'min_az': self.motion_limits['azimuth']['lower'],
'max_az': self.motion_limits['azimuth']['upper'],
'min_el': self.motion_limits['elevation']['lower'],
'max_el': self.motion_limits['elevation']['upper'],
'axes_sequential': self.motion_limits.get('axes_sequential', False),
})
# User parameters defining the danger zone, and escape
# policies. This list should be kept consistent with the
# preamble docs in avoidance.py.
for k in [
'exclusion_radius',
'min_sun_time',
'response_time',
'el_horizon',
'el_dodging',
'axes_sequential',
]:
if k in self.sun_config:
_p['policy'][k] = self.sun_config[k]
self.sun_params = _p
def _get_sun_policy(self, key):
now = time.time()
p = self.sun_params
active = (p['active_avoidance'] and (now >= p['disable_until']))
if key == 'motion_blocked':
return active and p['block_motion']
elif key == 'sunsafe_moves':
return active
elif key == 'escape_enabled':
return active
elif key == 'map_valid':
return (self.sun is not None
and self.sun.base_time is not None
and self.sun.base_time <= now
and self.sun.base_time >= now - 2 * SUN_MAP_REFRESH)
else:
return p[key]
[docs]
@ocs_agent.param('_')
@inlineCallbacks
def monitor_sun(self, session, params):
"""monitor_sun()
**Process** - Monitors and reports the position of the Sun;
maintains a Sun Safety Map for verifying that moves and scans
are Sun-safe; triggers a "Sun escape" if the boresight enters
an unsafe position.
The monitoring functions are always active (as long as this
process is running). But the escape functionality must be
explicitly enabled (through the default platform
configuration, command line arguments, or the update_sun
task).
Session data looks like this::
{
"timestamp": 1698848292.5579932,
"active_avoidance": false,
"disable_until": 0,
"block_motion": false,
"recompute_req": false,
"next_drill": null,
"safety_map_kw": {
"sun_time_shift": 0
},
"policy": {
"exclusion_radius": 20,
"el_horizon": 10,
"min_sun_time": 1800,
"response_time": 7200,
"min_az": -90,
"max_az": 450,
"min_el": 18.5,
"max_el": 90
},
"sun_pos": {
"map_exists": true,
"map_is_old": false,
"map_ref_time": 1698848179.1123455,
"platform_azel": [
90.0158,
20.0022
],
"sun_radec": [
216.50815789438036,
-14.461844389380719
],
"sun_azel": [
78.24269024936028,
60.919554369324096
],
"sun_down": false,
"sun_dist": 41.75087242151837,
"sun_safe_time": 71760,
"platform_down": false
},
"avoidance": {
"safety_unknown": false,
"warning_zone": false,
"danger_zone": false,
"escape_triggered": false,
"escape_active": false,
"last_escape_time": 0,
"sun_is_real": true,
"platform_is_moveable": true
}
}
In debugging, the Sun position might be falsified. In that
case the "sun_pos" subtree will contain an entry like this::
"WARNING": "Fake Sun Position is in use!",
and "avoidance": "sun_is_real" will be set to false. (No
other functionality is changed when using a falsified Sun
position; flags are computed and actions decided based on the
false position.)
"""
def _get_sun_map():
# To run in thread ...
start = time.time()
new_sun = avoidance.SunTracker(policy=self.sun_params['policy'],
**self.sun_params['safety_map_kw'])
return new_sun, time.time() - start
def _notify_recomputed(result):
nonlocal req_out
new_sun, compute_time = result
self.log.info('(Re-)computed Sun Safety Map (took %.1fs)' %
compute_time)
self.sun = new_sun
req_out = False
def lookup(keys, tree):
if isinstance(keys, str):
keys = [keys]
if len(keys) == 0:
if isinstance(tree, (bool, np.bool_)):
return int(tree)
return tree
return lookup(keys[1:], tree[keys[0]])
# Feed -- unpack some elements of session.data
feed_keys = {
'sun_avoidance': ('active_avoidance', int),
'sun_az': (('sun_pos', 'sun_azel', 0), float),
'sun_el': (('sun_pos', 'sun_azel', 1), float),
'sun_dist': (('sun_pos', 'sun_dist'), float),
'sun_safe_time': (('sun_pos', 'sun_safe_time'), float),
'sun_down': (('sun_pos', 'sun_down'), int),
'platform_down': (('sun_pos', 'platform_down'), int),
}
for k in ['warning_zone', 'danger_zone',
'escape_triggered', 'escape_active']:
feed_keys[f'sun_{k}'] = (('avoidance', k), int)
feed_pacer = Pacemaker(.1)
req_out = False
self.sun = None
last_panic = 0
session.data = {}
while session.status in ['starting', 'running']:
new_data = {
'timestamp': time.time(),
}
new_data.update(self.sun_params)
try:
az, el = [self.data['status']['summary'][f'{ax}_current_position']
for ax in ['Azimuth', 'Elevation']]
if az is None or el is None:
raise KeyError
except KeyError:
az, el = None, None
try:
moveable = [bool(self.data['status']['platform_status'][k])
for k in ['Safe_mode', 'Remote_mode']]
moveable = (not moveable[0]) and moveable[1]
except KeyError:
moveable = False
no_map = self.sun is None
old_map = (not no_map
and self.sun._now() - self.sun.base_time > SUN_MAP_REFRESH)
do_recompute = (
not req_out
and (no_map or old_map or self.sun_params['recompute_req'])
)
if do_recompute:
req_out = True
self.sun_params['recompute_req'] = False
threads.deferToThread(_get_sun_map).addCallback(
_notify_recomputed)
new_data.update({
'sun_pos': {
'map_exists': not no_map,
'map_is_old': old_map,
'map_ref_time': None if no_map else self.sun.base_time,
'platform_azel': (az, el),
},
})
# Flags for unsafe position.
safety_known, danger_zone, warning_zone = False, False, False
# Flag for time shift during debugging.
sun_is_real = True
if self.sun is not None:
info = self.sun.get_sun_pos(az, el)
sun_is_real = ('WARNING' not in info)
new_data['sun_pos'].update(info)
if az is not None:
t = self.sun.check_trajectory([az], [el])['sun_time']
new_data['sun_pos']['sun_safe_time'] = t if t > 0 else 0
safety_known = True
danger_zone = (t < self.sun_params['policy']['min_sun_time'])
warning_zone = (t < self.sun_params['policy']['response_time'])
# Has a drill been requested?
drill_req = (self.sun_params['next_drill'] is not None
and self.sun_params['next_drill'] <= time.time())
# Should we be doing a escape_sun_now?
panic_for_real = safety_known and danger_zone and self._get_sun_policy('escape_enabled')
panic_for_fun = drill_req
# Is escape_sun_now task running?
ok, msg, _session = self.agent.status('escape_sun_now')
escape_in_progress = (_session.get('status', 'done') != 'done')
# Block motion as long as we are not sun-safe.
self.sun_params['block_motion'] = (panic_for_real or escape_in_progress)
new_data['avoidance'] = {
'safety_unknown': not safety_known,
'warning_zone': warning_zone,
'danger_zone': danger_zone,
'escape_triggered': panic_for_real,
'escape_active': escape_in_progress,
'last_escape_time': last_panic,
'sun_is_real': sun_is_real,
'platform_is_moveable': moveable,
}
if (panic_for_real or panic_for_fun):
now = time.time()
# Different retry conditions for moveable / not moveable
if moveable and (now - last_panic > 60.):
# When moveable, only attempt escape every 1 minute.
self.log.warn('monitor_sun is requesting escape_sun_now.')
self.agent.start('escape_sun_now')
last_panic = now
elif not moveable and (now - last_panic > 600.):
# When not moveable, only print complaint message every 10 minutes.
self.log.warn('monitor_sun cannot request escape_sun_now, '
'because platform not moveable by remote!')
last_panic = now
# Regardless, clear the drill indicator -- we don't
# want that to occur randomly later.
self.sun_params['next_drill'] = None
# Update session.
session.data.update(new_data)
# Publish -- only if we have the sun pos though..
if sun_is_real and safety_known and feed_pacer.next_sample <= time.time():
feed_pacer.sleep() # should be instantaneous, just update counters
block = {'timestamp': time.time(),
'block_name': 'sun0',
'data': {}}
for kshort, (keys, cast) in feed_keys.items():
block['data'][kshort] = cast(lookup(keys, new_data))
self.agent.publish_to_feed('sun', block)
yield dsleep(1)
return True, 'monitor_sun exited cleanly.'
[docs]
@ocs_agent.param('reset', type=bool, default=None)
@ocs_agent.param('enable', type=bool, default=None)
@ocs_agent.param('temporary_disable', type=float, default=None)
@ocs_agent.param('escape', type=bool, default=None)
@ocs_agent.param('exclusion_radius', type=float, default=None)
@ocs_agent.param('shift_sun_hours', type=float, default=None)
def update_sun(self, session, params):
"""update_sun(reset=None, enable=None, temporary_disable=None, \
escape=None, exclusion_radius=None, \
shift_sun_hours=None)
**Task** - Update Sun monitoring and avoidance parameters.
All arguments are optional.
Args:
reset (bool): If True, reset all sun_params to the platform
defaults. (The "defaults" includes any overrides
specified on Agent command line.)
enable (bool): If True, enable active Sun avoidance. If
avoidance was temporarily disabled it is re-enabled. If
False, disable active Sun avoidance (non-temporarily).
temporary_disable (float): If set, disable Sun avoidance for
this number of seconds.
escape (bool): If True, schedule an escape drill for 10
seconds from now.
exclusion_radius (float): If set, change the FOV radius
(degrees), for Sun avoidance purposes, to this number.
shift_sun_hours (float): If set, compute the Sun position as
though it were this many hours in the future. This is for
debugging, testing, and work-arounds. Pass zero to
cancel.
"""
do_recompute = False
now = time.time()
self.log.info('update_sun params: {params}',
params={k: v for k, v in params.items()
if v is not None})
if params['reset']:
self._reset_sun_params()
do_recompute = True
if params['enable'] is not None:
self.sun_params['active_avoidance'] = params['enable']
self.sun_params['disable_until'] = 0
if params['temporary_disable'] is not None:
self.sun_params['disable_until'] = params['temporary_disable'] + now
if params['escape']:
self.log.warn('Setting sun escape drill to start in 10 seconds.')
self.sun_params['next_drill'] = now + 10
if params['exclusion_radius'] is not None:
self.sun_params['policy']['exclusion_radius'] = \
params['exclusion_radius']
do_recompute = True
if params['shift_sun_hours'] is not None:
self.sun_params['safety_map_kw']['sun_time_shift'] = \
params['shift_sun_hours'] * 3600
do_recompute = True
if do_recompute:
self.sun_params['recompute_req'] = True
return True, 'Params updated.'
[docs]
@ocs_agent.param('_')
@inlineCallbacks
def escape_sun_now(self, session, params):
"""escape_sun_now()
**Task** - Take control of the platform, and move it to a
Sun-Safe position. This will abort/stop any current go_to or
generate_scan, identify the safest possible path to North or
South (without changing elevation, if possible), and perform
the moves to get there.
"""
state = 'init'
last_state = state
session.data = {'state': state,
'timestamp': time.time()}
while session.status in ['starting', 'running'] and state not in ['escape-done']:
az, el = [self.data['status']['summary'][f'{ax}_current_position']
for ax in ['Azimuth', 'Elevation']]
if state == 'init':
state = 'escape-abort'
elif state == 'escape-abort':
# raise stop flags and issue stop on motion ops
for op in ['generate_scan', 'go_to']:
self.agent.stop(op)
self.agent.abort(op)
state = 'escape-wait-idle'
timeout = 30
elif state == 'escape-wait-idle':
for op in ['generate_scan', 'go_to']:
ok, msg, _session = self.agent.status(op)
if _session.get('status', 'done') != 'done':
break
else:
state = 'escape-move'
last_move = time.time()
timeout -= 1
if timeout < 0:
state = 'escape-stop'
elif state == 'escape-stop':
yield self._stop()
state = 'escape-move'
last_move = time.time()
elif state == 'escape-move':
self.log.info('Getting escape path for (t, az, el) = '
'(%.1f, %.3f, %.3f)' % (time.time(), az, el))
escape_path = self.sun.find_escape_paths(az, el)
if escape_path is None:
self.log.error('Failed to find acceptable path; using '
'failsafe (South, low el).')
legs = [(180., max(self.sun_params['policy']['min_el'], 0))]
else:
legs = escape_path['moves'].nodes[1:]
self.log.info('Escaping to (az, el)={pos} ({n} moves)',
pos=legs[-1], n=len(legs))
state = 'escape-move-legs'
leg_d = None
elif state == 'escape-move-legs':
def _leg_done(result):
nonlocal state, last_move, leg_d
all_ok, msg = result
if not all_ok:
self.log.error('Leg failed.')
# Recompute the escape path.
if time.time() - last_move > 60:
self.log.error('Too many failures -- giving up for now')
state = 'escape-done'
else:
state = 'escape-move'
else:
leg_d = None
last_move = time.time()
if not self._get_sun_policy('escape_enabled'):
state = 'escape-done'
if leg_d is None:
if len(legs) == 0:
state = 'escape-done'
else:
leg_az, leg_el = legs.pop(0)
leg_d = self._go_to_axes(session, az=leg_az, el=leg_el,
clear_faults=True)
leg_d.addCallback(_leg_done)
elif state == 'escape-done':
# This block won't run -- loop will exit.
pass
session.data['state'] = state
if state != last_state:
self.log.info('escape_sun_now: state is now "{state}"', state=state)
last_state = state
yield dsleep(1)
return True, "Exited."
def _check_scan_sunsafe(self, az1, az2, el, v_az, a_az):
"""This will return True if active avoidance is disabled. If active
avoidance is enabled, then it will only return true if the
planned scan seems to currently be sun-safe.
"""
if not self._get_sun_policy('sunsafe_moves'):
return True, 'Sun-safety checking is not enabled.'
if not self._get_sun_policy('map_valid'):
return False, 'Sun Safety Map not computed or stale; run the monitor_sun process.'
# Include a bit of buffer for turn-arounds.
az1, az2 = min(az1, az2), max(az1, az2)
turn = v_az**2 / a_az
az1 -= turn
az2 += turn
n = max(2, int(np.ceil((az2 - az1) / 1.)))
azs = np.linspace(az1, az2, n)
info = self.sun.check_trajectory(azs, azs * 0 + el)
safe = info['sun_time'] >= self.sun_params['policy']['min_sun_time']
if safe:
msg = 'Scan is safe for %.1f hours' % (info['sun_time'] / 3600)
else:
msg = 'Scan will be unsafe in %.1f hours' % (info['sun_time'] / 3600)
return safe, msg
def _get_sunsafe_moves(self, target_az, target_el):
"""Given a target position, find a Sun-safe way to get there. This
will either be a direct move, or else an ordered slew in az
before el (or vice versa).
If target_az or target_el are None, they are taken to be the
current axis position.
Returns (legs, msg). If legs is None, it indicates that no
Sun-safe path could be found; msg is an error message. If a
path can be found, the legs is a list of intermediate move
targets, ``[(az0, el0), (az1, el1) ...]``, terminating on
``(target_az, target_el)``. msg is None in that case. The
first position (az0, el0) is the current position of the
platform.
In the case that the platform is already at the target
position, the returned list will still have 2 entries.
When Sun avoidance is not enabled, this function returns as
though the direct path to the target is a safe one (though
axes_sequential=True may cause an intermediate step to be
added).
"""
# Get current position.
(az0, el0), msg = self._current_azel()
if az0 is None:
return None, msg
if target_az is None:
target_az = az0
if target_el is None:
target_el = el0
if not self._get_sun_policy('sunsafe_moves'):
if self.motion_limits.get('axes_sequential'):
# Move in az first, then el.
return [(target_az, el0), (target_az, target_el)], None
return [(az0, el0), (target_az, target_el)], None
if not self._get_sun_policy('map_valid'):
return None, 'Sun Safety Map not computed or stale; run the monitor_sun process.'
# Check the target position and block it outright.
if self.sun.check_trajectory([target_az], [target_el])['sun_time'] <= 0:
return None, 'Requested target position is not Sun-Safe.'
moves = self.sun.analyze_paths(az0, el0, target_az, target_el)
move, decisions = self.sun.select_move(moves)
if move is None:
return None, 'No Sun-Safe moves could be identified!'
legs = list(move['moves'].nodes)
if len(legs) == 1:
# Pad to two entries.
return [legs[0], legs[0]], None
return legs, None
#
# HWP State Safety
#
[docs]
@inlineCallbacks
def monitor_hwp(self, session, params):
"""monitor_hwp()
**Process** - Monitors the state of a HWP, by querying a
HWPSupervisor's ``monitor`` Process session data. Assesses
what motions are permitted, given the HWP state.
session.data example::
{
"interlocks_config": {
"configured": true,
"enabled": true,
"instance_id": "hwp-supervisor",
"limit_sun_avoidance": true,
"tolerance": 0.1,
},
"supervisor_data": {
"timestamp": 1744692973.185377,
"ok": true,
"err_msg": "",
"_grip_brakes": [1, 1, 1],
"_grip_state": "ungripped",
"_is_spinning": true,
"_target_freq": 2.1,
"grip_state": "ungripped",
"spin_state": "spinning",
"request_block_motion": null,
"request_block_motion_timestamp": null
},
"allowed": {
"el": [true, [40, 70], [
[40, 70],
],
"az": [true, [40, 70], [
[40, 70],
],
"third": [false, null, []],
}
}
"""
if not self.hwp_rules.configured:
session.data = {
'interlocks_config': self.hwp_rules.encoded(basic=True),
}
return False, "HWP Interlocks not configured - monitoring blocked."
def _update_sun_lims(el_range):
if el_range is None:
el_range = (self.motion_limits['elevation']['lower'],
self.motion_limits['elevation']['upper'])
# Is this a change to sun policy?
if tuple(el_range) != (self.sun_params['policy']['min_el'],
self.sun_params['policy']['max_el']):
self.sun_params['policy']['min_el'] = el_range[0]
self.sun_params['policy']['max_el'] = el_range[1]
self.sun_params['recompute_req'] = True
pacer = Pacemaker(1.)
last_enabled = False
hwp_supervisor = self.hwp_rules.get_client()
while session.status == 'running':
new_data = yield threads.deferToThread(hwp_supervisor.update)
new_sd = {
'interlocks_config': self.hwp_rules.encoded(basic=True),
'supervisor_data': new_data,
}
(_, el), msg = self._current_azel()
allowed = self.hwp_rules.test_range(
(None if el is None else (el, el)),
new_data.get('grip_state'),
new_data.get('spin_state'))
new_sd['allowed'] = allowed
session.data = new_sd
self.data['hwp'] = new_data
if self.hwp_rules.enabled:
if self.hwp_rules.limit_sun_avoidance and el is not None:
tol = self.hwp_rules.tolerance
if allowed['el'][0]:
_update_sun_lims(allowed['el'][1])
else:
# If we're in a forbidden spot ... just try to
# keep it at this el for now, and hopefully
# HWPSupervisor will improve things soon.
_update_sun_lims((el - tol, el + tol))
elif last_enabled and self.hwp_rules.limit_sun_avoidance:
# Restore the sun_params default elevation range.
_update_sun_lims(None)
last_enabled = self.hwp_rules.enabled
yield pacer.dsleep()
return True, "Bye."
[docs]
@ocs_agent.param('enable', type=bool, default=None)
def update_hwp(self, session, params):
"""update_hwp(enable=None)
**Task** - Update HWP state monitoring and safety parameters.
All arguments are optional.
Args:
enable (bool): If True, enable HWP state checks. If False,
disable HWP state checks (non-temporarily).
"""
self.log.info('update_hwp params: {params}',
params={k: v for k, v in params.items()
if v is not None})
if not self.hwp_rules.configured:
return False, 'HWP interlocks not configured in config file.'
if params['enable'] is not None:
self.hwp_rules.enabled = params['enable']
return True, 'Params updated.'
def _check_hwpsafe(self, el1, el2, axes, hwp_data=None):
"""Checks whether certain axis motions are permitted, over a
certain range of elevations.
Args:
el1, el2: elevation range over which the checks should be
considered.
axes: list of axes to check for permission on (taken from
'el', 'az', 'third').
hwp_data: dict from which to get grip_state and spin_state;
if not provided, uses self.data['hwp'].
Returns:
motions_permitted (bool): whether all requested axes are
permitted to move, given the HWP state and el range.
message (str): helpful text.
Note that when hwp_rules are not enabled, motions are
generally permitted by this function.
See additional helper functions, _check_hwpsafe_here and
_check_hwpsafe_legs.
"""
if not self.hwp_rules.enabled:
return (True, "HWP monitoring is disabled.")
# Grab a self-consistent copy...
if hwp_data is None:
hwp_data = self.data['hwp']
# Check staleness
if time.time() - hwp_data.get('timestamp', 0) > 10:
return False, "HWP monitoring dataset is stale; cannot validate move."
# Check it.
state_args = {
'el_range': [el1, el2],
'grip_state': hwp_data.get('grip_state'),
'spin_state': hwp_data.get('spin_state'),
}
axes_ok = self.hwp_rules.test_range(**state_args)
for ax in axes:
if not axes_ok[ax][0]:
return (False, (f"Motion in {ax} not permitted due to HWP rules "
f"for {state_args}."))
return axes_ok, 'All requested axes pass the HWP rules.'
def _check_hwpsafe_here(self, axes):
"""Check whether motion in ``axes`` is permitted, at the
present elevation and hwp state.
"""
if not self.hwp_rules.enabled:
return (True, "HWP monitoring is disabled.")
(_, el), msg = self._current_azel()
if el is None:
return False, f'HWP safety could not be ensured: {msg}'
return self._check_hwpsafe(el, el, axes)
def _check_hwpsafe_legs(self, legs):
"""Check whether motions specified by legs (list of (az, el)
positions) are permitted, given the current HWP state.
"""
if not self.hwp_rules.enabled:
return (True, "HWP monitoring is disabled.")
hwp_data = self.data['hwp']
for (az1, el1), (az2, el2) in zip(legs[:-1], legs[1:]):
axes = []
if abs(el2 - el1) > self.hwp_rules.tolerance:
axes.append('el')
if abs(az2 - az1) > self.hwp_rules.tolerance:
axes.append('az')
ok, msg = self._check_hwpsafe(el1, el2, axes, hwp_data=hwp_data)
if not ok:
return False, msg
return (True, "All moves passed HWP safety checks.")
#
# Exercise!
#
[docs]
@ocs_agent.param('action', choices=['open', 'close'])
@inlineCallbacks
def set_shutter(self, session, params):
"""set_shutter(action)
**Task** - Request a (LAT) shutter action, wait for it to
complete or fail.
Args:
action (str): 'open' or 'close'
Notes:
If the shutter reads out as in the requested state already,
then no action is taken and the task will quickly return as
succeeded.
"""
def log(msg):
session.add_message(msg)
log(f'requested action={params["action"]}')
if self.data['status'].get('shutter', {}).get('Shutter_open') is None:
return False, 'Shutter dataset does not seem to be populating.'
if params['action'] == 'open':
dset_cmd = 'ShutterOpen'
desired_key, undesired_key = 'Shutter_open', 'Shutter_closed'
else:
dset_cmd = 'ShutterClose'
desired_key, undesired_key = 'Shutter_closed', 'Shutter_open'
# This just needs to be longer than 1 loop time.
STATE_WAIT = 5.
# Shutter typically closes in ~45 seconds. But in early tests
# it sometimes takes an additional 45 seconds for moving->0.
MOVING_WAIT = 120.
state = 'init'
session.data = {'state': state,
'timestamp': time.time()}
while (session.status in ['starting', 'running']
and state not in ['done', 'error']):
last_state = state
now = time.time()
az, el = [self.data['status']['summary'][f'{ax}_current_position']
for ax in ['Azimuth', 'Elevation']]
shutter = self.data['status']['shutter']
for bad_key in ['Shutter_timeout', 'Shutter_failure']:
if shutter[bad_key]:
state = 'error'
message = f'Detected error state: {bad_key}'
if state in ['error', 'done']:
pass
elif state == 'init':
if shutter[desired_key] and not shutter[undesired_key]:
state = 'done'
message = f'Shutter already reporting state={desired_key}'
else:
# Issue the command
result = yield self.acu_control.Command(self.datasets['shutter'], dset_cmd)
if result in OK_RESPONSES:
state = 'wait-moving'
timeout = time.time() + STATE_WAIT
else:
state = 'error'
message = 'Failed to issue shutter command.'
elif state == 'wait-moving':
if now > timeout:
state = 'error'
message = 'Shutter failed to start moving.'
elif shutter['Shutter_moving']:
state = 'wait-stopped'
timeout = now + MOVING_WAIT
elif state == 'wait-stopped':
if now > timeout:
state = 'error'
message = 'Shutter will not stop moving.'
elif not shutter['Shutter_moving']:
state = 'wait-final'
timeout = now + STATE_WAIT
elif state == 'wait-final':
if now > timeout:
state = 'error'
message = 'Shutter failed to reach final expected state.'
elif shutter[desired_key] and not shutter[undesired_key]:
state = 'done'
message = 'Shutter move successful.'
else:
message = f'invalid state: {state}'
state = 'error'
session.data['state'] = state
if state != last_state:
log(f'set_shutter: state is now "{state}"')
last_state = state
yield dsleep(1)
if state == 'done':
return True, message
elif state == 'error':
return False, message
return False, 'Aborted in state {state}'
[docs]
@ocs_agent.param('starting_index', type=int, default=0)
def exercise(self, session, params):
"""exercise(starting_index=0)
**Process** - Run telescope platform through some pre-defined motions.
For historical reasons, this does not command agent functions
internally, but rather instantiates a *client* and calls the
agent as though it were an external entity.
"""
# Load the exercise plan.
plans = yaml.safe_load(open(self.exercise_plan, 'rb'))
super_plan = exercisor.get_plan(plans[self.acu_config_name])
session.data = {
'timestamp': time.time(),
'iterations': 0,
'attempts': 0,
'errors': 0,
}
def _publish_activity(activity):
msg = {
'block_name': 'A',
'timestamp': time.time(),
'data': {'activity': activity},
}
self.agent.publish_to_feed('activity', msg)
def _publish_error(delta_error=1):
session.data['errors'] += delta_error
msg = {
'block_name': 'B',
'timestamp': time.time(),
'data': {'error_count': session.data['errors']}
}
self.agent.publish_to_feed('activity', msg)
def _exit_now(ok, msg):
_publish_activity('idle')
self.agent.feeds['activity'].flush_buffer()
return ok, msg
_publish_activity('idle')
_publish_error(0)
target_instance_id = self.agent.agent_address.split('.')[-1]
exercisor.set_client(target_instance_id, self.agent.site_args)
settings = super_plan.get('settings', {})
plan_idx = 0
plan_t = None
for plan in super_plan['steps']:
plan['iter'] = iter(plan['driver'])
while session.status in ['running']:
time.sleep(1)
session.data['timestamp'] = time.time()
session.data['iterations'] += 1
# Fault maintenance
faults = exercisor.get_faults()
if faults['safe_lock']:
self.log.info('SAFE lock detected, exiting')
return _exit_now(False, 'Exiting on SAFE lock.')
if faults['local_mode']:
self.log.info('LOCAL mode detected, exiting')
return _exit_now(False, 'Exiting on LOCAL mode.')
if faults['az_summary']:
if session.data['attempts'] > 5:
self.log.info('Too many az summary faults, exiting.')
return _exit_now(False, 'Too many az summary faults.')
session.data['attempts'] += 1
self.log.info('az summary fault -- trying to clear.')
exercisor.clear_faults()
time.sleep(10)
continue
session.data['attempts'] = 0
# Plan execution
active_plan = super_plan['steps'][plan_idx]
if plan_t is None:
plan_t = time.time()
now = time.time()
if now - plan_t > active_plan['duration']:
plan_idx = (plan_idx + 1) % len(super_plan['steps'])
plan_t = None
continue
if settings.get('use_boresight'):
bore_target = random.choice(settings['boresight_opts'])
self.log.info(f'Setting boresight={bore_target}...')
_publish_activity('boresight')
exercisor.set_boresight(bore_target)
plan, info = next(active_plan['iter'])
self.log.info('Launching next scan. plan={plan}', plan=plan)
_publish_activity(active_plan['driver'].code)
ok = None
if 'targets' in plan:
exercisor.steps(**plan)
else:
exercisor.scan(**plan)
_publish_activity('idle')
if ok is None:
self.log.info('Scan completed without error.')
else:
self.log.info(f'Scan exited with error: {ok}')
_publish_error()
return _exit_now(True, "Stopped run process")
def add_agent_args(parser_in=None):
if parser_in is None:
parser_in = argparse.ArgumentParser()
pgroup = parser_in.add_argument_group('Agent Options')
pgroup.add_argument("--acu-config")
pgroup.add_argument("--no-processes", action='store_true',
default=False)
pgroup.add_argument("--ignore-axes", choices=['el', 'az', 'third', 'none'],
nargs='+', help="One or more axes to ignore.")
pgroup.add_argument("--disable-idle-reset", action='store_true',
help="Disable idle_reset, even for LAT.")
pgroup.add_argument("--min-el", type=float,
help="Override the minimum el defined in platform config.")
pgroup.add_argument("--max-el", type=float,
help="Override the maximum el defined in platform config.")
pgroup.add_argument("--disable-sun-avoidance", action='store_true',
help="Disable Sun Avoidance before startup.")
pgroup.add_argument("--disable-hwp-interlocks", action='store_true',
help="Disable HWP interlocks before startup.")
return parser_in
def main(args=None):
parser = add_agent_args()
args = site_config.parse_args(agent_class='ACUAgent',
parser=parser,
args=args)
agent, runner = ocs_agent.init_site_agent(args)
_ = ACUAgent(agent, args.acu_config,
startup=not args.no_processes,
ignore_axes=args.ignore_axes,
disable_idle_reset=args.disable_idle_reset,
disable_sun_avoidance=args.disable_sun_avoidance,
disable_hwp_interlocks=args.disable_hwp_interlocks,
min_el=args.min_el,
max_el=args.max_el)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()