import calendar
import datetime
import math
import pickle
import time
from dataclasses import dataclass, replace
import numpy as np
#: The number of seconds in a day.
DAY = 86400
#: Minimum number of points to group together on the start of a new
# leg, to not trigger programtrack error.
MIN_GROUP_NEW_LEG = 4
#: Registry for turn-around profile types.
TURNAROUNDS_ENUM = {
'standard': 0,
}
def _progtrack_format_time(timestamp):
fmt = '%j, %H:%M:%S'
return (time.strftime(fmt, time.gmtime(timestamp))
+ '{:.6f}'.format(timestamp % 1.)[1:])
class FromFileScan:
loop_time: float
free_form: bool
points: list
preamble_count: int
step_time: float
az_range: tuple
el_range: tuple
[docs]
@dataclass
class TrackPoint:
#: Timestamp of the point (unix timestamp)
timestamp: float
#: Azimuth (deg).
az: float
#: Elevation (deg).
el: float
#: Azimuth velocity (deg/s).
az_vel: float
#: Elevation velocity (deg/s).
el_vel: float
#: Az flag: 0 if stationary, 1 if non-final point of const-vel
#: scan segment; 2 if final point of const-vel segment.
az_flag: int = 0
#: El flag: like az_flag but for el.
el_flag: int = 0
#: If 1, indicates that once this point is uploaded the next point
#: in sequence also needs to be soon uploaded. Used at start of a
#: new const-vel scan segment.
group_flag: int = 0
def track_point_time_shift(p, dt):
return replace(p, timestamp=p.timestamp + dt)
[docs]
def get_track_points_text(tpl, timestamp_offset=None, with_group_flag=False,
text_block=False):
"""Get a list of ProgramTrack lines for upload to ACU.
Args:
tpl (list): list of TrackPoint to convert.
timestamp_offset (float): offset to add to all timestamps
before rendering (defaults to 0).
with_group_flag (bool): If True return each line as
(group_flag, text).
text_block (bool): If True, return all lines joined together
into a single string.
"""
if timestamp_offset is None:
timestamp_offset = 0
fmted_times = [_progtrack_format_time(p.timestamp + timestamp_offset)
for p in tpl]
all_lines = [('{t}; {p.az:.6f}; {p.el:.6f}; {p.az_vel:.4f}; '
'{p.el_vel:.4f}; {p.az_flag}; {p.el_flag}\r\n')
.format(p=p, t=t)
for p, t in zip(tpl, fmted_times)]
if text_block:
return ''.join(all_lines)
if with_group_flag:
all_lines = [(p.group_flag, line) for p, line in zip(tpl, all_lines)]
return all_lines
[docs]
def from_file(filename, fmt=None):
"""Load a ProgramTrack trajectory from a file. This function
supports two formats. The modern format is a pickle file. The
older numpy format is also supported.
Parameters:
filename (str): Full path to the file.
fmt (str): Optional, one of "pickle" or "numpy". If this is
unspecified, the code will assume pickle format unless the
filename ends in "npy".
Returns:
FromFileScan: Object containing the loaded points and
supporting config for ProgramTrack mode.
Notes:
For the pickle-based file format, the file must encode a
single dict. Here is a minimal example::
{
'timestamp': [1747233498, 1747233499, ..., 1747233523],
'az': [180, 181, ..., 160],
'el': [60, 60, ..., 60],
}
Those 3 required entries are "vectors", with the same
length. The vectors may be any iterable type but please use
lists or ndarrays. Optionally, the user may specify these other
vectors:
- 'az_vel': the az velocity at each point.
- 'el_vel': the el velocity at each point.
- 'az_flag': the az flag.
- 'el_flag': the el flag.
- 'group_flag': the point grouping flag.
If not provided, the "vel" vectors will be computed from the
gradient of the position vectors. The "flag" vectors will
default to all 0. See :class:`TrackPoint
<socs.agents.acu.drivers.TrackPoint>` for purpose of the various
flags.
The following settings (stated with their default values) may
also be included in the dict:
- 'free_form' (bool, False): Specifies whether the track
should be run with the turn-around profiler disabled and
spline (rather than linear) interpolation enabled. When
false, the ACU will do linear interpolation *and* automatic
profiling of turn-arounds, between constant velocity
segments. Setting free_form=False is appropriate for
constant elevation, constant az speed scans. Set
free_form=True for more complex scans.
- 'loopable' (bool, False): Specifies whether the track should
be repeated, forever. When this is the case, the final
point of the track (i.e. the last point in all the vectors)
is ignored *except for the timestamp*, which is used to set
the time at which the next loop iteration is started.
- 'preamble_count' (int, 0): If loopable, this specifies the
number of points in the track (i.e. in each vector) that are
not included in the loopable portion. This permits the
track to have a ramp-up, or partial initial scan segment,
prior to entering the repetable template.
The older numpy-based format does not support the additional
settings. The numpy file must contain an iterable with 5 or 7
entries, where all entries are 1-d arrays of the same length.
The first 5 arrays will correspond to 'timestamp', 'az', 'el',
'az_vel', 'el_vel'. The 2 optional arrays are 'az_flag' and
'el_flag'.
"""
if fmt is None:
fmt = 'pickle'
if filename.endswith('npy'):
fmt = 'numpy'
if fmt == 'numpy':
info = np.load(filename)
if len(info) not in [5, 7]:
raise ValueError(f'Unexpected field count ({len(info)}) in {filename}')
times, az, el, vaz, vel = info[:5]
if len(info) == 5:
az_flags = np.zeros(len(times), int)
el_flags = az_flags
elif len(info) == 7:
az_flags = info[5].astype('int')
el_flags = info[6].astype('int')
output = FromFileScan()
output.loop_time = 0.
output.free_form = False
output.step_time = np.diff(times).min()
output.points = [TrackPoint(*a) for a in zip(
times, az, el, vaz, vel, az_flags, el_flags)]
output.az_range = (az.min(), az.max())
output.el_range = (el.min(), el.max())
elif fmt == 'pickle':
data = pickle.load(open(filename, 'rb'))
output = FromFileScan()
output.loop_time = 0.
output.free_form = data.get('free_form', False)
keys = ['timestamp', 'az', 'el', 'az_vel', 'el_vel',
'az_flag', 'el_flag', 'group_flag']
vects = {k: data.get(k) for k in keys}
n = len(vects['az'])
dtv = np.gradient(vects['timestamp'])
if vects['az_vel'] is None:
vects['az_vel'] = np.gradient(vects['az']) / dtv
if vects['el_vel'] is None:
vects['el_vel'] = np.gradient(vects['el']) / dtv
if vects['az_flag'] is None:
vects['az_flag'] = np.zeros(n, int)
if vects['el_flag'] is None:
vects['el_flag'] = np.zeros(n, int)
if vects['group_flag'] is None:
vects['group_flag'] = np.zeros(n, int)
output.preamble_count = data.get('preamble_count', 0)
if data.get('loopable'):
# Measure repeat time.
output.loop_time = (vects['timestamp'][-1]
- vects['timestamp'][output.preamble_count])
# ... and drop last point.
for k in keys:
vects[k] = vects[k][:-1]
columns = [vects[k] for k in keys]
output.points = [TrackPoint(*row) for row in zip(*columns)]
output.step_time = np.diff(vects['timestamp']).min()
output.az_range = (vects['az'].min(), vects['az'].max())
output.el_range = (vects['el'].min(), vects['el'].max())
else:
raise ValueError(f"Invalid fmt={fmt}")
return output
[docs]
def timecode(acutime, now=None):
"""Takes the time code produced by the ACU status stream and returns
a unix timestamp.
Parameters:
acutime (float): The time recorded by the ACU status stream,
corresponding to the fractional day of the year.
now (float): The time, as unix timestamp, to assume it is now.
This is for testing, it defaults to time.time().
"""
sec_of_day = (acutime - 1) * DAY
if now is None:
now = time.time() # testing
# This guard protects us at end of year, when time.time() and
# acutime might correspond to different years.
if acutime > 180:
context = datetime.datetime.utcfromtimestamp(now - 30 * DAY)
else:
context = datetime.datetime.utcfromtimestamp(now + 30 * DAY)
year = context.year
gyear = calendar.timegm(time.strptime(str(year), '%Y'))
comptime = gyear + sec_of_day
return comptime
def _get_target_az(current_az, current_t, increasing, az_endpoint1, az_endpoint2, az_speed, az_drift):
# Return the next endpoint azimuth, based on current (az, t)
# and whether to move in +ve or -ve az direction.
#
# Includes the effects of az_drift, to keep the scan endpoints
# (at least at the end of a scan) on the drifted trajectories.
if increasing:
target = max(az_endpoint1, az_endpoint2)
else:
target = min(az_endpoint1, az_endpoint2)
if az_drift is not None:
v = az_speed if increasing else -az_speed
target = target + az_drift / (v - az_drift) * (
(target - current_az + v * current_t))
return target
[docs]
def generate_constant_velocity_scan(az_endpoint1, az_endpoint2, az_speed,
acc, el_endpoint1, el_endpoint2,
el_speed=0,
num_batches=None,
num_scans=None,
start_time=None,
wait_to_start=10.,
step_time=1.,
batch_size=500,
az_start='mid_inc',
az_first_pos=None,
az_drift=None):
"""Python generator to produce times, azimuth and elevation positions,
azimuth and elevation velocities, azimuth and elevation flags for
arbitrarily long constant-velocity azimuth scans.
Parameters:
az_endpoint1 (float): azimuth endpoint for the scan start
az_endpoint2 (float): second azimuth endpoint of the scan
az_speed (float): speed of the constant-velocity azimuth motion
acc (float): turnaround acceleration for the azimuth motion at the
endpoints
el_endpoint1 (float): elevation endpoint for the scan start
el_endpoint2 (float): second elevation endpoint of the scan. For
constant az scans, this must be equal to el_endpoint1.
el_speed (float): speed of the elevation motion. For constant az
scans, set to 0.0
num_batches (int or None): sets the number of batches for the
generator to create. Default value is None (interpreted as infinite
batches).
num_scans (int or None): if not None, limits the points
returned to the specified number of constant velocity legs.
start_time (float or None): a ctime at which to start the scan.
Default is None, which is interpreted as starting now +
wait_to_start.
wait_to_start (float): number of seconds to wait between
start_time and when the scan actually starts. Default is 10 seconds.
step_time (float): time between points on the constant-velocity
parts of the motion. Default value is 1.0 seconds. Minimum value is
0.05 seconds.
batch_size (int): number of values to produce in each iteration.
Default is 500. Batch size is reset to the length of one leg of the
motion if num_batches is not None.
az_start (str): part of the scan to start at. To start at one
of the extremes, use 'az_endpoint1', 'az_endpoint2', or
'end' (same as 'az_endpoint1'). To start in the midpoint
of the scan use 'mid_inc' (for first half-leg to have
positive az velocity), 'mid_dec' (negative az velocity),
or 'mid' (velocity oriented towards endpoint2).
az_first_pos (float): If not None, the first az scan will
start at this position (but otherwise proceed in the same
starting direction).
az_drift (float): The rate (deg / s) at which to shift the
scan endpoints in time. This can be used to better track
celestial sources in targeted scans.
Yields:
points (list): a list of TrackPoint objects. Raises
StopIteration once exit condition, if defined, is met.
"""
if az_endpoint1 == az_endpoint2:
raise ValueError('Generator requires two different az endpoints!')
# Force the el_speed to 0. It matters because an el_speed in
# ProgramTrack data that exceeds the ACU limits will cause the
# point to be rejected, even if there's no motion in el planned
# (which, at the time of this writing, there is not).
el_speed = 0.
# Note that starting scan direction gets modified, below,
# depending on az_start.
increasing = az_endpoint2 > az_endpoint1
if az_start in ['az_endpoint1', 'az_endpoint2', 'end']:
if az_start in ['az_endpoint1', 'end']:
az = az_endpoint1
else:
az = az_endpoint2
increasing = not increasing
elif az_start in ['mid_inc', 'mid_dec', 'mid']:
az = (az_endpoint1 + az_endpoint2) / 2
if az_start == 'mid':
pass
elif az_start == 'mid_inc':
increasing = True
else:
increasing = False
else:
raise ValueError(f'az_start value "{az_start}" not supported. Choose from '
'az_endpoint1, az_endpoint2, mid_inc, mid_dec')
az_vel = az_speed if increasing else -az_speed
# Bias the starting point for the first leg?
if az_first_pos is not None:
az = az_first_pos
if start_time is None:
t0 = time.time() + wait_to_start
else:
t0 = start_time
t = 0
turntime = 2.0 * az_speed / acc
el = el_endpoint1
if step_time < 0.05:
raise ValueError('Time step size too small, must be at least '
'0.05 seconds')
daz = step_time * az_speed
el_vel = el_speed
az_flag = 0
el_flag = 0
if num_batches is None:
stop_iter = float('inf')
else:
stop_iter = num_batches
batch_size = int(np.ceil(abs(az_endpoint2 - az_endpoint1) / daz))
def dec_num_scans():
nonlocal num_scans
if num_scans is not None:
num_scans -= 1
def check_num_scans():
return num_scans is None or num_scans > 0
target_az = _get_target_az(az, t, increasing, az_endpoint1, az_endpoint2, az_speed, az_drift)
point_group_batch = 0
i = 0
while i < stop_iter and check_num_scans():
i += 1
point_block = []
for j in range(batch_size):
point_block.append(TrackPoint(
timestamp=t + t0,
az=az, el=el, az_vel=az_vel, el_vel=el_vel,
az_flag=az_flag, el_flag=el_flag,
group_flag=int(point_group_batch > 0)))
if point_group_batch > 0:
point_group_batch -= 1
if increasing:
if az <= (target_az - 2 * daz):
t += step_time
az += daz
az_vel = az_speed
el_vel = el_speed
az_flag = 1
el_flag = 0
elif az == target_az:
# Turn around.
t += turntime
az_vel = -1 * az_speed
el_vel = el_speed
az_flag = 1
el_flag = 0
increasing = False
target_az = _get_target_az(az, t, increasing, az_endpoint1, az_endpoint2, az_speed, az_drift)
dec_num_scans()
point_group_batch = MIN_GROUP_NEW_LEG - 1
else:
time_remaining = (target_az - az) / az_speed
az = target_az
t += time_remaining
az_vel = az_speed
el_vel = el_speed
az_flag = 2
el_flag = 0
else:
if az >= (target_az + 2 * daz):
t += step_time
az -= daz
az_vel = -1 * az_speed
el_vel = el_speed
az_flag = 1
el_flag = 0
elif az == target_az:
# Turn around.
t += turntime
az_vel = az_speed
el_vel = el_speed
az_flag = 1
el_flag = 0
increasing = True
target_az = _get_target_az(az, t, increasing, az_endpoint1, az_endpoint2, az_speed, az_drift)
dec_num_scans()
point_group_batch = MIN_GROUP_NEW_LEG - 1
else:
time_remaining = (az - target_az) / az_speed
az = target_az
t += time_remaining
az_vel = -1 * az_speed
el_vel = el_speed
az_flag = 2
el_flag = 0
if not check_num_scans():
# Kill the velocity on the last point and exit -- this
# was recommended at LAT FAT for smoothly stopping the
# motion at end of program.
point_block[-1].az_vel = 0
point_block[-1].el_vel = 0
break
yield point_block
[docs]
def generate_type3_scan(az_endpoint1, az_endpoint2, az_speed,
acc, el_endpoint1, el_endpoint2,
el_freq=.15,
az_vel_ref=None,
num_batches=None,
num_scans=None,
start_time=None,
wait_to_start=10.,
step_time=1.,
batch_size=500,
az_start='mid_inc',
az_first_pos=None,
az_drift=None):
"""Python generator to produce times, azimuth and elevation positions,
azimuth and elevation velocities, azimuth and elevation flags for
arbitrarily long type 3 scan.
Parameters:
az_endpoint1 (float): azimuth endpoint for the scan start
az_endpoint2 (float): second azimuth endpoint of the scan
az_speed (float): speed of the constant-velocity azimuth motion
acc (float): turnaround acceleration for the azimuth motion at the
endpoints
el_endpoint1 (float): elevation endpoint for the scan start
el_endpoint2 (float): second elevation endpoint of the scan. For
constant az scans, this must be equal to el_endpoint1.
el_freq(float): frequency of the elevation nods in Hz.
az_vel_ref(float or None): azimuth to center the velocity profile at.
If None then the average of the endpoints is used.
num_batches (int or None): sets the number of batches for the
generator to create. Default value is None (interpreted as infinite
batches).
num_scans (int or None): if not None, limits the points
returned to the specified number of constant velocity legs.
start_time (float or None): a ctime at which to start the scan.
Default is None, which is interpreted as starting now +
wait_to_start.
wait_to_start (float): number of seconds to wait between
start_time and when the scan actually starts. Default is 10 seconds.
step_time (float): time between points on the constant-velocity
parts of the motion. Default value is 1.0 seconds. Minimum value is
0.05 seconds.
batch_size (int): number of values to produce in each iteration.
Default is 500. Batch size is reset to the length of one leg of the
motion if num_batches is not None.
az_start (str): part of the scan to start at. To start at one
of the extremes, use 'az_endpoint1', 'az_endpoint2', or
'end' (same as 'az_endpoint1'). To start in the midpoint
of the scan use 'mid_inc' (for first half-leg to have
positive az velocity), 'mid_dec' (negative az velocity),
or 'mid' (velocity oriented towards endpoint2).
az_first_pos (float): If not None, the first az scan will
start at this position (but otherwise proceed in the same
starting direction).
az_drift (float): The rate (deg / s) at which to shift the
scan endpoints in time. This can be used to better track
celestial sources in targeted scans.
Yields:
points (list): a list of TrackPoint objects. Raises
StopIteration once exit condition, if defined, is met.
"""
def get_scan_time(az0, az1, az_speed, az_cent):
upper = -1 * np.cos(np.deg2rad(az1 - az_cent))
lower = -1 * np.cos(np.deg2rad(az0 - az_cent))
return abs(upper - lower) / np.deg2rad(az_speed)
if az_endpoint1 == az_endpoint2:
raise ValueError('Generator requires two different az endpoints!')
if az_drift is not None:
raise ValueError("Az drift not supported for type 2 or 3 scans!")
# Get center of az range
if az_vel_ref is None:
az_vel_ref = (az_endpoint1 + az_endpoint2) / 2.
az_cent = az_vel_ref - 90
if any([abs(_az - az_vel_ref) > 70.
for _az in [az_endpoint1, az_endpoint2]]):
raise ValueError("Az limits for type 2 and 3 scans must not be more than 70 "
"degrees away from az_vel_ref.")
# Get el throw
el_throw = abs(el_endpoint2 - el_endpoint1) / 2
el_cent = (el_endpoint1 + el_endpoint2) / 2.
# Note that starting scan direction gets modified, below,
# depending on az_start.
increasing = az_endpoint2 > az_endpoint1
if az_start in ['az_endpoint1', 'az_endpoint2', 'end']:
if az_start in ['az_endpoint1', 'end']:
az = az_endpoint1
else:
az = az_endpoint2
increasing = not increasing
elif az_start in ['mid_inc', 'mid_dec', 'mid']:
az = (az_endpoint1 + az_endpoint2) / 2
if az_start == 'mid':
pass
elif az_start == 'mid_inc':
increasing = True
else:
increasing = False
else:
raise ValueError(f'az_start value "{az_start}" not supported. Choose from '
'az_endpoint1, az_endpoint2, mid_inc, mid_dec')
az_vel = az_speed if increasing else -az_speed
# Bias the starting point for the first leg?
if az_first_pos is not None:
az = az_first_pos
if start_time is None:
t0 = time.time() + wait_to_start
else:
t0 = start_time
vel_0 = az_speed / np.sin(np.deg2rad(az_endpoint1 - az_cent))
vel_1 = az_speed / np.sin(np.deg2rad(az_endpoint2 - az_cent))
min_tt = {1: (0.85 * abs(vel_0) / 9 * 11.616)**.5, -1: (0.85 * abs(vel_1) / 9 * 11.616)**.5}
tt = {1: max(2 * vel_0 / acc, min_tt[1]), -1: max(2 * vel_1 / acc, min_tt[-1])}
t = 0
el = el_endpoint1
if step_time < 0.05:
raise ValueError('Time step size too small, must be at least '
'0.05 seconds')
el_vel = el_throw * el_freq * 2 * np.pi * np.cos(t * el_freq * 2 * np.pi)
az_flag = 0
el_flag = 0
if num_batches is None:
stop_iter = float('inf')
else:
stop_iter = num_batches
batch_size = int(np.ceil(get_scan_time(az_endpoint1, az_endpoint2, az_speed, az_cent) / step_time))
def dec_num_scans():
nonlocal num_scans
if num_scans is not None:
num_scans -= 1
def check_num_scans():
return num_scans is None or num_scans > 0
target_az = _get_target_az(az, t, increasing, az_endpoint1, az_endpoint2, az_speed / np.sin(np.deg2rad(az - az_cent)), az_drift)
point_group_batch = 0
i = 0
while i < stop_iter and check_num_scans():
i += 1
point_block = []
for j in range(batch_size):
point_block.append(TrackPoint(
timestamp=t + t0,
az=az, el=el, az_vel=az_vel / np.sin(np.deg2rad(az - az_cent)), el_vel=el_vel,
az_flag=az_flag, el_flag=el_flag,
group_flag=int(point_group_batch > 0)))
if point_group_batch > 0:
point_group_batch -= 1
if increasing:
if get_scan_time(az, target_az, az_speed, az_cent) > 2 * step_time:
t += step_time
az += step_time * az_speed / np.sin(np.deg2rad(az - az_cent))
el = el_cent + el_throw * np.sin(t * el_freq * 2 * np.pi)
az_vel = az_speed
el_vel = el_throw * el_freq * 2 * np.pi * np.cos(t * el_freq * 2 * np.pi)
az_flag = 1
el_flag = 0
elif az == target_az:
# Turn around.
t += tt[1]
az_vel = -1 * az_speed
el_vel = 0
az_flag = 1
el_flag = 0
increasing = False
target_az = _get_target_az(az, t, increasing, az_endpoint1, az_endpoint2, az_speed / np.sin(np.deg2rad(az - az_cent)), az_drift)
dec_num_scans()
point_group_batch = MIN_GROUP_NEW_LEG - 1
else:
time_remaining = get_scan_time(az, target_az, az_speed, az_cent)
az = target_az
t += time_remaining
az_vel = az_speed
el_vel = 0
az_flag = 2
el_flag = 0
else:
if get_scan_time(az, target_az, az_speed, az_cent) > 2 * step_time:
t += step_time
az -= step_time * az_speed / np.sin(np.deg2rad(az - az_cent))
el = el_cent + el_throw * np.sin(t * el_freq * 2 * np.pi)
az_vel = -1 * az_speed
el_vel = el_throw * el_freq * 2 * np.pi * np.cos(t * el_freq * 2 * np.pi)
az_flag = 1
el_flag = 0
elif az == target_az:
# Turn around.
t += tt[-1]
az_vel = az_speed
el_vel = 0
az_flag = 1
el_flag = 0
increasing = True
target_az = _get_target_az(az, t, increasing, az_endpoint1, az_endpoint2, az_speed / np.sin(np.deg2rad(az - az_cent)), az_drift)
dec_num_scans()
point_group_batch = MIN_GROUP_NEW_LEG - 1
else:
time_remaining = get_scan_time(az, target_az, az_speed, az_cent)
az = target_az
t += time_remaining
az_vel = -1 * az_speed
el_vel = 0
az_flag = 2
el_flag = 0
if not check_num_scans():
# Kill the velocity on the last point and exit -- this
# was recommended at LAT FAT for smoothly stopping the
# motion at end of program.
point_block[-1].az_vel = 0
point_block[-1].el_vel = 0
break
yield point_block
[docs]
def generate_type2_scan(az_endpoint1, az_endpoint2, az_speed,
acc, el_endpoint1,
az_vel_ref=None,
num_batches=None,
num_scans=None,
start_time=None,
wait_to_start=10.,
step_time=1.,
batch_size=500,
az_start='mid_inc',
az_first_pos=None,
az_drift=None):
"""Python generator to produce times, azimuth and elevation positions,
azimuth and elevation velocities, azimuth and elevation flags for
arbitrarily long type 2 scan.
Parameters:
az_endpoint1 (float): azimuth endpoint for the scan start
az_endpoint2 (float): second azimuth endpoint of the scan
az_speed (float): speed of the constant-velocity azimuth motion
acc (float): turnaround acceleration for the azimuth motion at the
endpoints
el_endpoint1 (float): elevation endpoint for the scan start
az_vel_ref(float or None): azimuth to center the velocity profile at.
If None then the average of the endpoints is used.
num_batches (int or None): sets the number of batches for the
generator to create. Default value is None (interpreted as infinite
batches).
num_scans (int or None): if not None, limits the points
returned to the specified number of constant velocity legs.
start_time (float or None): a ctime at which to start the scan.
Default is None, which is interpreted as starting now +
wait_to_start.
wait_to_start (float): number of seconds to wait between
start_time and when the scan actually starts. Default is 10 seconds.
step_time (float): time between points on the constant-velocity
parts of the motion. Default value is 1.0 seconds. Minimum value is
0.05 seconds.
batch_size (int): number of values to produce in each iteration.
Default is 500. Batch size is reset to the length of one leg of the
motion if num_batches is not None.
az_start (str): part of the scan to start at. To start at one
of the extremes, use 'az_endpoint1', 'az_endpoint2', or
'end' (same as 'az_endpoint1'). To start in the midpoint
of the scan use 'mid_inc' (for first half-leg to have
positive az velocity), 'mid_dec' (negative az velocity),
or 'mid' (velocity oriented towards endpoint2).
az_first_pos (float): If not None, the first az scan will
start at this position (but otherwise proceed in the same
starting direction).
az_drift (float): The rate (deg / s) at which to shift the
scan endpoints in time. This can be used to better track
celestial sources in targeted scans.
Yields:
points (list): a list of TrackPoint objects. Raises
StopIteration once exit condition, if defined, is met.
"""
return generate_type3_scan(az_endpoint1, az_endpoint2, az_speed,
acc, el_endpoint1, el_endpoint1,
el_freq=0,
az_vel_ref=az_vel_ref,
num_batches=num_batches,
num_scans=num_scans,
start_time=start_time,
wait_to_start=wait_to_start,
step_time=step_time,
batch_size=batch_size,
az_start=az_start,
az_first_pos=az_first_pos,
az_drift=az_drift)
[docs]
def plan_scan(az_end1, az_end2, el, v_az=1, a_az=1, az_start=None):
"""Determine some important parameters for running a ProgramTrack
scan with the desired end points, velocity, and mean turn-around
acceleration.
These get complicated in the limit of high velocity and narrow
scan.
Returns:
A dict with outputs of the calculations. The following items
must be considered when generating and posting the track points:
- 'step_time': The recommended track point separation, in
seconds.
- 'wait_to_start': The minimum time (s) between initiating
ProgramTrack mode and the first uploaded point's timestamp.
- 'init_az': The az (deg) at which to position the telescope
before beginning the scan. This takes into account any "ramp
up" that needs to occur and the fact that such ramp up needs
to be finished before the ACU starts profiling the first
turn-around.
The following dict items provide additional detail /
intermediate results:
- 'scan_start_buffer': Minimum amount (deg of az) by which to
shift the start of the first scan leg in order to satisfy the
requirements for az_prep and az_rampup. This ultimately is
what can make init_az different from the natural first leg
starting point. This parameter is always non-negative.
- 'turnprep_buffer': Minimum azimuth travel required for
ProgramTrack to prepare a turn-around.
- 'rampup_buffer': Minimum azimuth travel required for
ProgramTrack to ramp up to the first leg velocity. Degrees,
positive.
- 'rampup_time': Number of seconds before the first track point
where the platform could start moving (as part of smooth
acceleration into the initial velocity).
"""
# Convert Agent-friendly arguments to az/throw/init
az = (az_end1 + az_end2) / 2
throw = (az_end2 - az_end1) / 2
if az_start in [None, 'mid']:
init = 'mid'
elif az_start == 'mid_inc':
init = 'mid'
throw = abs(throw)
elif az_start == 'mid_dec':
init = 'mid'
throw = -abs(throw)
elif az_start in ['az_endpoint1', 'end']:
init = 'end'
elif az_start in ['az_endpoint2']:
init = 'end'
throw = -throw
else:
raise ValueError(f'Unexpected az_start={az_start}')
# Info to pass back.
plan = {}
# Point time separation: at least 5 points per leg, preferably 10.
dt = 2 * abs(throw / v_az) / 10
dt = min(max(dt, 0.1), 1.0)
assert (2 * abs(throw / v_az) / dt >= 5)
plan['step_time'] = dt
# Turn around prep distance (deg)? 5 point periods, times the vel.
turnprep_buffer = 5 * dt * v_az
# Ramp-up distance needed
a0 = 1. # Peak accel of ramp-up...
rampup_buffer = v_az**2 / a0
plan['turnprep_buffer'] = turnprep_buffer
plan['rampup_buffer'] = rampup_buffer
# Any az ramp-up prep required?
if init == 'mid':
scan_start_buffer = max(turnprep_buffer + rampup_buffer - abs(throw), 0)
elif init == 'end':
scan_start_buffer = max(turnprep_buffer + rampup_buffer - 2 * abs(throw), 0)
plan['scan_start_buffer'] = scan_start_buffer
# Set wait time (this comes out a little lower than its supposed to...)
# plan['wait_time'] = v_az / a0 * 2
plan['rampup_time'] = v_az / a0
plan['wait_to_start'] = max(5, plan['rampup_time'] * 1.2)
# Fill out some other useful info...
plan['init_az'] = az - math.copysign(scan_start_buffer, throw)
if init == 'end':
plan['init_az'] -= throw
return plan