# Sun Avoidance
#
# The docstring below is intended for injection into the documentation
# system.
"""When considering Sun Safety of a boresight pointing, we consider an
exclusion zone around the Sun with a user-specified radius. This is
called the ``exclusion_radius`` or the field-of-view radius.
The Safety of the instrument at any given moment is parametrized
by two numbers and a boolean:
``sun_dist``
The separation between the Sun and the boresight (az, el), in
degrees. This is defined without regards for the horizon --
i.e. even if the Sun and the boresight are both below "the
horizon", the sun distance can be small.
``sun_time``
The minimum time, in seconds, which must elapse before the current
(az, el) pointing of the boresight will lie within the exclusion
radius of the Sun. Although some positions will "never" see the
Sun, the sun_time for such positions is set to about 2 days, as a
place-holder for "the future". When the boresight is well below
the horizon, the sun_time is set to "never", i.e. 2 days. When
the sun is below the horizon, then the sun_time will always be at
least the time until next sun-rise.
``sun_down``
A boolean indicating whether the sun is below the horizon
elevation (which is a parameter of the instrument).
While the ``sun_dist`` and ``sun_down`` are important indicators of
whether the instrument is currently in immediate danger, the
``sun_time`` is useful for looking forward in order to avoid positions
that will soon be dangerous.
The user-defined policy for Sun Safety is captured in the following
settings:
``exclusion_radius``
The radius, in degres, of a disk centered on the Sun that must be
avoided by the boresight.
``min_sun_time``
An (az, el) position is considered unsafe (danger zone) if the
``sun_time`` is less than the ``min_sun_time``. (Expressed in
seconds.)
``response_time``
An (az, el) position is considered vulnerable (warning zone) if
the ``sun_time`` is less than the ``response_time``. This is
intended to represent the maximum amount of time it could take an
operator to reach the instrument and secure it, were motion to
stall unexpectedly. (Expressed in seconds.)
``el_horizon``
The elevation (in degrees) below which the Sun may be considered
as invisible to the instrument.
``el_dodging``
This setting affects how point-to-point motions are executed, with
respect to what elevations may be used in intermediate legs of the
trajectory. When this is False, the platform is restricted to
travel only at elevations that lie between the initial and the
target elevation. When True, the platform is permitted to travel
at other elevations, all the way up to the limits of the
platform. Using True is helpful to find Sun-safe trajectories in
some circumstances. But False is helpful if excess elevation
changes are potentially disturbing to the cryogenics. This
setting only affects point-to-point motions; "escape" paths will
always consider all available elevations.
``axes_sequential``
If True, a point-to-point motion will only be accepted if each leg
is a constant el or constant az move. When this setting is False,
legs that move simultaneously in az and el are permitted (and
probably preferred) as long as they are safe.
A "Sun-safe" position is a pointing of the boresight that currently
has a ``sun_time`` that meets or exceeds the ``min_sun_time``
parameter.
"""
import datetime
import functools
import math
import time
import ephem
import numpy as np
from pixell import enmap
from so3g.proj import coords, quat
try:
import pylab as pl
except ModuleNotFoundError:
pass
DEG = np.pi / 180
HOUR = 3600
DAY = 86400
SIDEREAL_DAY = 86164.0905
NO_TIME = DAY * 2
#: Default policy to apply when evaluating Sun-safety and planning
#: trajectories. Note the Agent code may apply different defaults,
#: based on known platform details.
DEFAULT_POLICY = {
'exclusion_radius': 20,
'min_el': 0,
'max_el': 90,
'min_az': -45,
'max_az': 405,
'el_horizon': 0,
'el_dodging': False,
'min_sun_time': HOUR,
'response_time': HOUR * 4,
'axes_sequential': False,
}
[docs]
class SunTracker:
"""Provide guidance on what horizion coordinate positions and
trajectories are sun-safe.
Args:
policy (dict): Exclusion policy parameters. See module
docstring, and DEFAULT_POLICY. The policy should also include
{min,max}\\_{el,az}, giving the limits supported by those axes.
site (EarthlySite or None): Site to use; default is the SO LAT.
If not None, pass an so3g.proj.EarthlySite or compatible.
map_res (float, deg): resolution to use for the Sun Safety Map.
sun_time_shift (float, seconds): For debugging and testing,
compute the Sun's position as though it were this many
seconds in the future. If None or zero, this feature is
disabled.
fake_now (float, seconds): For debugging and testing, replace
the tracker's computation of the current time (time.time())
with this value. If None, this testing feature is disabled.
compute (bool): If True, immediately compute the Sun Safety Map
by calling .reset().
base_time (unix timestamp): Store this base_time and, if compute
is True, pass it to .reset().
"""
def __init__(self, policy=None, site=None,
map_res=.5, sun_time_shift=None, fake_now=None,
compute=True, base_time=None):
# Note res is stored in radians.
self.res = map_res * DEG
if sun_time_shift is None:
sun_time_shift = 0.
self.sun_time_shift = sun_time_shift
self.fake_now = fake_now
self.base_time = base_time
# Process and store the instrument config and safety policy.
if policy is None:
policy = {}
for k in policy.keys():
assert k in DEFAULT_POLICY
_p = dict(DEFAULT_POLICY)
_p.update(policy)
self.policy = _p
if site is None:
# This is close enough.
site = coords.SITES['so_lat']
site_eph = ephem.Observer()
site_eph.lon = site.lon * DEG
site_eph.lat = site.lat * DEG
site_eph.elevation = site.elev
self._site = site_eph
if compute:
self.reset(base_time)
def _now(self):
if self.fake_now:
return self.fake_now
return time.time()
def _sun(self, t):
self._site.date = \
datetime.datetime.utcfromtimestamp(t)
return ephem.Sun(self._site)
@staticmethod
def _horizon_branch(az=0, el=0):
"""Given telescope boresight (el, az), which could be vectors,
return (az_can, el_can, inverted), where el_can is in the
branch most useful for Sun avoidance assessment, [-90, 90],
and az_can is adjusted such that (az, el) and (az_can, el_can)
correspond to same on-sky point. For such points where the
rebranching also implies an inversion of the focal plane,
inverted is True.
"""
q = quat.rotation_lonlat(-az * coords.DEG, el * coords.DEG)
lon, lat, phi = quat.decompose_lonlat(q)
inverted = np.zeros(lon.shape, bool)
inverted[abs(phi) > .001 * coords.DEG] = True
return -lon / coords.DEG, lat / coords.DEG, inverted
[docs]
def reset(self, base_time=None):
"""Compute and store the Sun Safety Map for a specific
timestamp.
This basic computation is required prior to calling other
functions that use the Sun Safety Map.
"""
# Set a reference time -- the map of sun times is usable from
# this reference time to at least 12 hours in the future.
if base_time is None:
base_time = self._now()
# Map extends from dec -80 to +80.
shape, wcs = enmap.band_geometry(
dec_cut=80 * DEG, res=self.res, proj='car')
# Map where each pixel is distance to the Sun.
sun_dist = enmap.zeros(shape, wcs=wcs) - 1
# Map of time-to-sun-danger, when sun is above horizon.
sun_times_up = sun_dist.copy()
# Map of time-to-sun-danger, when sun is below horizon.
sun_times_dn = sun_dist.copy()
# Quaternion rotation for each point in the map.
dec, ra = sun_dist.posmap()
map_q = quat.rotation_lonlat(ra.ravel(), dec.ravel())
# Look up the sun position at our shifted time; but also shift
# explicitly in RA for earth rotation.
v = self._sun(base_time + self.sun_time_shift)
sun_dec = v.dec
sun_ra = v.ra - self.sun_time_shift * (2 * np.pi / SIDEREAL_DAY)
del v
# Compute the map of angular distance to the Sun, at
# base_time.
qsun = quat.rotation_lonlat(sun_ra, sun_dec)
qoff = ~qsun * map_q
sun_dist[:] = (quat.decompose_iso(~qoff)[0]
.reshape(sun_dist.shape) / DEG)
# For the sun_times_* maps, each pixel will give the time
# delay between base_time and the time when the sky coordinate
# will be inside the Sun exclusion mask.
dt = -ra[0] * DAY / (2 * np.pi)
# For sun_times_up (sun above horizon), the region around the
# sun is bad right now.
sun_times_up[sun_dist <= self.policy['exclusion_radius']] = 0.
# Loop over rows of the sun_times_* maps.
for g_up, g_dn in zip(sun_times_up, sun_times_dn):
if (g_up < 0).all():
# Sun mask does not touch this declination.
continue
# Identify pixel on the right of the masked region.
flips = ((g_up == 0)
* np.hstack((g_up[:-1] != g_up[1:],
g_up[-1] != g_up[0]))).nonzero()[0]
dt0 = dt[flips[0]]
_dt = (dt - dt0) % DAY
# For sun_times_up, fill only pixels outside sun mask.
g_up[g_up < 0] = _dt[g_up < 0]
# For sun_times_dn, fill all pixels.
g_dn[:] = _dt[:]
# Fill in remaining -1 with NO_TIME.
sun_times_up[sun_times_up < 0] = NO_TIME
sun_times_dn[sun_times_dn < 0] = NO_TIME
# Store the sun_times map and stuff.
self.base_time = base_time
self.sun_times_up = sun_times_up
self.sun_times_dn = sun_times_dn
self.sun_dist = sun_dist
self.map_q = map_q
def _azel_pix(self, az, el, dt=0, round=True, segments=False):
"""Return the pixel indices of the Sun Safety Map that are
hit by the trajectory (az, el) at time dt.
Args:
az (array of float, deg): Azimuth.
el (array of float, deg): Elevation.
dt (array of float, s): Time offset relative to the base
time, at which to evaluate the trajectory.
round (bool): If True, round results to integer (for easy
look-up in the map).
segments (bool): If True, split up the trajectory into
segments (a list of pix_ji sections) such that they don't
cross the map boundaries at any point.
"""
az = np.asarray(az)
el = np.asarray(el)
qt = coords.CelestialSightLine.naive_az_el(
self.base_time + dt, az * DEG, el * DEG).Q
ra, dec, _ = quat.decompose_lonlat(qt)
src_map = self.sun_dist # Only used for coordinate ops.
pix_ji = src_map.sky2pix((dec, ra))
if round:
pix_ji = pix_ji.round().astype(int)
# Handle out of bounds as follows:
# - RA indices are mod-ed into range.
# - dec indices are clamped to the map edge.
j, i = pix_ji
j[j < 0] = 0
j[j >= src_map.shape[-2]] = src_map.shape[-2] - 1
i[:] = i % src_map.shape[-1]
if segments:
jumps = ((abs(np.diff(pix_ji[0])) > src_map.shape[-2] / 2)
+ (abs(np.diff(pix_ji[1])) > src_map.shape[-1] / 2))
jump = jumps.nonzero()[0]
starts = np.hstack((0, jump + 1))
stops = np.hstack((jump + 1, len(pix_ji[0])))
return [pix_ji[:, a:b] for a, b in zip(starts, stops)]
return pix_ji
[docs]
def check_trajectory(self, az, el, t=None, raw=False):
"""For a telescope trajectory (vectors az, el, in deg), assumed to
occur at time t (defaults to now), get the minimum value of
the Sun Safety Map traversed by that trajectory. Also get the
minimum value of the Sun Distance map.
This requires the Sun Safety Map to have been computed with a
base_time in the 24 hours before t.
Returns a dict with entries:
- ``'sun_time'``: Minimum Sun Safety Time on the traj.
- ``'sun_time_start'``: Sun Safety Time at first point.
- ``'sun_time_stop'``: Sun Safety Time at last point.
- ``'sun_dist_min'``: Minimum distance to Sun, in degrees.
- ``'sun_dist_mean'``: Mean distance to Sun.
- ``'sun_dist_start'``: Distance to Sun, at first point.
- ``'sun_dist_stop'``: Distance to Sun, at last point.
"""
if t is None:
t = self._now()
az, el = np.asarray(az), np.asarray(el)
j, i = self._azel_pix(az, el, dt=t - self.base_time)
sun_delta = self.sun_times_up[j, i]
sun_dists = self.sun_dist[j, i]
# If the sun is below the horizon, sun times are modified.
az_sun, el_sun = self.get_sun_pos(t=t)['sun_azel']
if el_sun < self.policy['el_horizon']:
if (az_sun % 360.) > 180:
# The setting problem:
sun_delta = self.sun_times_dn[j, i]
else:
# The rising problem:
dt_rise = self._next_rise_time(t) - t
sun_delta[sun_delta < dt_rise] = dt_rise
# Positions below the modified horizon are always safe.
safe_el = self.policy['el_horizon'] - self.policy['exclusion_radius']
_, el_can, _ = self._horizon_branch(az, el)
sun_delta[el_can < safe_el] = NO_TIME
if raw:
return sun_delta, sun_dists
return {
'sun_time': sun_delta.min(),
'sun_time_start': sun_delta[0],
'sun_time_stop': sun_delta[-1],
'sun_dist_start': sun_dists[0],
'sun_dist_stop': sun_dists[-1],
'sun_dist_min': sun_dists.min(),
'sun_dist_mean': sun_dists.mean(),
}
[docs]
def get_sun_pos(self, az=None, el=None, t=None):
"""Get info on the Sun's location at time t. If (az, el) are also
specified, returns the angular separation between that
pointing and Sun's center.
"""
if t is None:
t = self._now()
eff_t = t + self.sun_time_shift
v = self._sun(eff_t)
qsun = quat.rotation_lonlat(v.ra, v.dec)
qzen = coords.CelestialSightLine.naive_az_el(eff_t, 0, np.pi / 2).Q
neg_sun_az, sun_el, _ = quat.decompose_lonlat(~qzen * qsun)
results = {
'sun_radec': (v.ra / DEG, v.dec / DEG),
'sun_azel': ((-neg_sun_az / DEG) % 360., sun_el / DEG),
'sun_down': sun_el / DEG < self.policy['el_horizon'],
}
if self.sun_time_shift != 0:
results['WARNING'] = 'Fake Sun Position is in use!'
if az is not None:
qtel = coords.CelestialSightLine.naive_az_el(
eff_t, az * DEG, el * DEG).Q
r = quat.decompose_iso(~qtel * qsun)[0]
results['sun_dist'] = r / DEG
_, el_can, _ = self._horizon_branch(az, el)
results['platform_down'] = \
el_can < (self.policy['el_horizon'] - self.policy['exclusion_radius'])
return results
def _next_rise_time(self, t):
"""Compute the smallest time, greater than or equal to t, at
which the Sun will be above the el_horizon. Accurate to 1s or
so.
"""
# Since the helper is cached, include anything in the args
# that could affect the computed value.
return self._next_rise_time_cache_helper(t, self.sun_time_shift)
@functools.lru_cache
def _next_rise_time_cache_helper(self, t, *args):
horizon = self.policy['el_horizon']
el0 = self.get_sun_pos(t=t)['sun_azel'][1]
if el0 > horizon:
return t
step = 3600
while abs(step) > 1.:
t += step
el = self.get_sun_pos(t=t)['sun_azel'][1]
if el > horizon:
step = -abs(step) / 2
else:
step = abs(step)
return t
[docs]
def show_map(self, axes=None, show=True):
"""Plot the Sun Safety Map and Sun Distance Map on the provided axes
(a list)."""
if axes is None:
fig, axes = pl.subplots(2, 1)
fig.tight_layout()
else:
fig = None
imgs = []
for axi, ax in enumerate(axes):
if axi == 0:
# Sun safe time
x = self.sun_times_up / HOUR
x[x == NO_TIME] = np.nan
title = 'Sun safe time (hours)'
elif axi == 1:
# Sun distance
x = self.sun_dist
title = 'Sun distance (degrees)'
im = ax.imshow(x, origin='lower', cmap='Oranges')
ji = self._azel_pix(0, np.array([90.]))
ax.scatter(ji[1], ji[0], marker='x', color='white')
ax.set_title(title)
pl.colorbar(im, ax=ax)
imgs.append(im)
if show:
pl.show()
return fig, axes, imgs
[docs]
def analyze_paths(self, az0, el0, az1, el1, t=None,
plot_file=None, dodging=True):
"""Design and analyze a number of different paths between (az0, el0)
and (az1, el1). Return the list, for further processing and
choice.
"""
if t is None:
t = self._now()
if plot_file:
assert (t == self.base_time) # Can only plot "now" results.
fig, axes, imgs = self.show_map(show=False)
last_el = None
# Test all trajectories with intermediate el.
all_moves = []
base = {
'req_start': (az0, el0),
'req_stop': (az1, el1),
'req_time': t,
'travel_el': (el0 + el1) / 2,
'travel_el_confined': True,
'direct': True,
}
# Suitable list of test els.
el_lims = [self.policy[_k] for _k in ['min_el', 'max_el']]
if el0 == el1:
el_nodes = [el0]
else:
el_nodes = sorted([el0, el1])
if dodging and (el_lims[0] < el_nodes[0]):
el_nodes.insert(0, el_lims[0])
if dodging and (el_lims[1] > el_nodes[-1]):
el_nodes.append(el_lims[1])
el_sep = 1.
el_cands = []
for i in range(len(el_nodes) - 1):
n = math.ceil((el_nodes[i + 1] - el_nodes[i]) / el_sep)
assert (n >= 1)
el_cands.extend(list(
np.linspace(el_nodes[i], el_nodes[i + 1], n + 1)[:-1]))
el_cands.append(el_nodes[-1])
for iel in el_cands:
detail = dict(base)
detail.update({
'direct': False,
'travel_el': iel,
'travel_el_confined': (iel >= min(el0, el1)) and (iel <= max(el0, el1)),
})
moves = MoveSequence(az0, el0, az0, iel, az1, iel, az1, el1, simplify=True)
detail['moves'] = moves
traj_info = self.check_trajectory(*moves.get_traj(), t=t)
detail.update(traj_info)
all_moves.append(detail)
if plot_file and (last_el is None or abs(last_el - iel) > 5):
c = 'black'
for j, i in self._azel_pix(*moves.get_traj(), round=True, segments=True):
for ax in axes:
a, = ax.plot(i, j, color=c, lw=1)
last_el = iel
# Include the direct path, but put in "worst case" details
# based on all "confined" paths computed above.
direct = dict(base)
direct['moves'] = MoveSequence(az0, el0, az1, el1, simplify=True)
traj_info = self.check_trajectory(*direct['moves'].get_traj(), t=t)
direct.update(traj_info)
conf = [m for m in all_moves if m['travel_el_confined']]
if len(conf):
for k in ['sun_time', 'sun_dist_min', 'sun_dist_mean']:
direct[k] = min([m[k] for m in conf])
all_moves.append(direct)
if plot_file:
# Add the direct traj, in blue.
segments = self._azel_pix(*direct['moves'].get_traj(), round=True, segments=True)
for ax in axes:
for j, i in segments:
ax.plot(i, j, color='blue')
for seg, rng, mrk in [(segments[0], slice(0, 1), 'o'),
(segments[-1], slice(-1, None), 'x')]:
ax.scatter(seg[1][rng], seg[0][rng], marker=mrk, color='blue')
# Add the selected trajectory in green.
selected = self.select_move(all_moves)[0]
if selected is not None:
traj = selected['moves'].get_traj()
segments = self._azel_pix(*traj, round=True, segments=True)
for ax in axes:
for j, i in segments:
ax.plot(i, j, color='green')
pl.savefig(plot_file)
return all_moves
[docs]
def find_escape_paths(self, az0, el0, t=None,
debug=False):
"""Design and analyze a number of different paths that move from (az0,
el0) to a sun safe position. Return the list, for further
processing and choice.
"""
if t is None:
t = self._now()
az_cands = []
_az = math.ceil(self.policy['min_az'] / 180) * 180
while _az <= self.policy['max_az']:
az_cands.append(_az)
_az += 180.
# Clip el0 into the allowed range.
el0 = np.clip(el0, self.policy['min_el'], self.policy['max_el'])
# Preference is to not change altitude; but allow for lowering
# (in absolute terms) -- there is "more sky" when you move
# away from zenith.
if el0 <= 90:
n_el = math.ceil(el0 - self.policy['min_el']) + 1
els = np.linspace(el0, self.policy['min_el'], n_el)
else:
n_el = math.ceil(self.policy['max_el'] - el0) + 1
els = np.linspace(el0, self.policy['max_el'], n_el)
path = None
for el1 in els:
paths = [self.analyze_paths(az0, el0, _az, el1, t=t, dodging=False)
for _az in az_cands]
best_paths = [self.select_move(p)[0] for p in paths]
best_paths = [p for p in best_paths if p is not None]
if len(best_paths):
path = self.select_move(best_paths)[0]
if debug:
cands, _ = self.select_move(best_paths, raw=True)
return cands
if path is not None:
return path
return None
[docs]
def select_move(self, moves, raw=False):
"""Given a list of possible "moves", select the best one.
The "moves" should be like the ones returned by
``analyze_paths``.
The best move is determined by first screening out dangerous
paths (ones that pass close to Sun, move closer to Sun
unnecessarily, violate axis limits, etc.) and then identifying
paths that minimize danger (distance to Sun; Sun time) and
path length.
If raw=True, a debugging output is returned; see code.
Returns:
(dict, list): (best_move, decisions)
``best_move`` -- the element of moves that is safest. If no
safe move was found, None is returned.
``decisions`` - List of dicts, in one-to-one correspondence
with ``moves``. Each decision dict has entries 'rejected'
(True or False) and 'reason' (string description of why the
move was rejected outright).
"""
_p = self.policy
decisions = [{'rejected': False,
'reason': None} for m in moves]
def reject(d, reason):
d['rejected'] = True
d['reason'] = reason
# According to policy, reject moves outright.
for m, d in zip(moves, decisions):
if d['rejected']:
continue
els = m['req_start'][1], m['req_stop'][1]
if _p['axes_sequential'] and m['moves'].includes_mixed_moves():
reject(d, 'Path includes simultaneous az+el legs (forbidden in policy).')
continue
if (m['sun_time_start'] < _p['min_sun_time']):
# If the path is starting in danger zone, then only
# enforce that the move takes the platform to a better place.
# Test > res, rather than > 0... near the minimum this
# can be noisy.
if m['sun_dist_start'] - m['sun_dist_min'] > self.res / DEG:
reject(d, 'Path moves even closer to sun.')
continue
if m['sun_time_stop'] < _p['min_sun_time']:
reject(d, 'Path does not end in sun-safe location.')
continue
elif m['sun_time'] < _p['min_sun_time']:
reject(d, 'Path too close to sun.')
continue
if m['travel_el'] < _p['min_el']:
reject(d, 'Path goes below minimum el.')
continue
if m['travel_el'] > _p['max_el']:
reject(d, 'Path goes above maximum el.')
continue
if not _p['el_dodging']:
if m['travel_el'] < min(*els):
reject(d, 'Path dodges (goes below necessary el range).')
continue
if m['travel_el'] > max(*els):
reject(d, 'Path dodges (goes above necessary el range).')
cands = [m for m, d in zip(moves, decisions)
if not d['rejected']]
if len(cands) == 0:
return None, decisions
def metric_func(m):
# Sorting key for move proposals. More preferable paths
# should have higher sort order.
azs = m['req_start'][0], m['req_stop'][0]
els = m['req_start'][1], m['req_stop'][1]
return (
# Low sun_time is bad, though anything longer
# than response_time is equivalent.
m['sun_time'] if m['sun_time'] < _p['response_time'] else _p['response_time'],
# Single leg moves are preferred, for simplicity.
m['direct'],
# Higher minimum sun distance is preferred.
m['sun_dist_min'],
# Shorter paths (less total az / el motion) are preferred.
-(abs(m['travel_el'] - els[0]) + abs(m['travel_el'] - els[1])),
-abs(azs[1] - azs[0]),
# Larger mean Sun distance is preferred. But this is
# subdominant to path length; otherwise spinning
# around a bunch of times can be used to lower the
# mean sun dist!
m['sun_dist_mean'],
# Prefer higher elevations for the move, all else being equal.
m['travel_el'],
)
cands.sort(key=metric_func)
if raw:
return [(c, metric_func(c)) for c in cands], decisions
return cands[-1], decisions
class MoveSequence:
def __init__(self, *args, simplify=False):
"""Container for a series of (az, el) positions. Pass the
positions to the constructor as (az, el) tuples::
MoveSequence((60, 180), (60, 90), (50, 90))
or equivalently as individual arguments::
MoveSequence(60, 180, 60, 90, 50, 90)
If simplify=True is passed, then any immediate position
repetitions are deleted.
"""
self.nodes = []
if len(args) == 0:
return
is_tuples = [isinstance(a, tuple) for a in args]
if all(is_tuples):
pass
elif any(is_tuples):
raise ValueError('Constructor accepts tuples or az, el, az, el; not a mix.')
else:
assert (len(args) % 2 == 0)
args = [(args[i], args[i + 1]) for i in range(0, len(args), 2)]
for (az, el) in args:
self.nodes.append((float(az), float(el)))
if simplify:
# Remove repeated nodes.
idx = 0
while idx < len(self.nodes) - 1:
if self.nodes[idx] == self.nodes[idx + 1]:
self.nodes.pop(idx + 1)
else:
idx += 1
def get_legs(self):
"""Iterate over the legs of the MoveSequence; yields each ((az_start,
el_start), (az_end, az_end)).
"""
for i in range(len(self.nodes) - 1):
yield self.nodes[i:i + 2]
def get_traj(self, res=0.5):
"""Return (az, el) vectors with the full path for the MoveSequence.
No step in az or el will be greater than res.
"""
if len(self.nodes) == 1:
return np.array([self.nodes[0][0]]), np.array([self.nodes[0][1]])
xx, yy = [], []
for (x0, y0), (x1, y1) in self.get_legs():
n = max(2, math.ceil(abs(x1 - x0) / res), math.ceil(abs(y1 - y0) / res))
xx.append(np.linspace(x0, x1, n))
yy.append(np.linspace(y0, y1, n))
return np.hstack(tuple(xx)), np.hstack(tuple(yy))
def includes_mixed_moves(self):
"""Returns True if any of the legs include simultaneous
changes in az and el.
"""
for (x0, y0), (x1, y1) in self.get_legs():
if (x0 != x1) and (y0 != y1):
return True
return False