import argparse
import ast
import os
import queue
import time
import numpy as np
import so3g # noqa: F401
import txaio
import yaml
from ocs import ocs_agent, site_config
from scipy import signal
from spt3g import core
MAX_CHANS = 4096
CHANS_PER_BAND = 512
pA_per_rad = 9e6 / (2 * np.pi)
# Map from primary key-names to their index in the SuperTimestream
# This will be populated when the first frame comes in
primary_idxs = {}
def load_frame_data(frame):
"""
Returns detector data from a G3Stream.
Returns:
times : np.ndarray
Array with shape (nsamps) of timestamps (sec)
data : np.ndarray
Array with shape (nchans, nsamps) of detector phase data (phi0)
"""
primary = frame['primary']
if isinstance(primary, core.G3TimesampleMap):
times = np.array(primary['UnixTime']) / 1e9
else:
if not primary_idxs:
for i, name in enumerate(frame['primary'].names):
primary_idxs[name] = i
times = np.array(primary.data[primary_idxs['UnixTime']]) / 1e9
d = frame['data']
if isinstance(d, core.G3TimestreamMap):
nchans, nsamps = len(d), d.n_samples
data = np.ndarray((nchans, nsamps), dtype=np.float32)
for i in range(nchans):
data[i] = d[f'r{i:0>4}'] * (2 * np.pi) / 2**16
else: # G3SuperTimestream probably
data = d.data * (2 * np.pi) / 2**16
return times, data
def sleep_while_running(duration, session, interval=1):
"""
Sleeps for a certain duration as long as a session object's status is
'starting' or 'running'. If the session is changed to 'stopping',
this will quickly return False. If the sleep is completed without
interruption this will return True.
Args
----
duration : float
Amount of time (sec) to sleep for.
session : OpSessions
Session whose status should be monitored
interval : float, optional
Polling interval (sec) to check the session status. Defaults to 1 sec.
"""
end_time = time.time() + duration
while session.status in ['starting', 'running']:
now = time.time()
if now >= end_time:
return True
time.sleep(min(interval, end_time - now))
return False
[docs]
class FIRFilter:
"""
Class for Finite Input Response filter. Filter state is preserved
between `lfilt` calls so you can filter frame-based data.
From scipy docs, the output of the filter is determined by:
a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]
- a[1]*y[n-1] - ... - a[N]*y[n-N]
"""
def __init__(self, b, a, nchans=None):
if nchans is None:
nchans = MAX_CHANS
self.b = b
self.a = a
self.z = np.zeros((nchans, len(b) - 1))
[docs]
def lfilt(self, data, in_place=True):
"""Filters data in place"""
n = len(data)
if in_place:
data[:, :], self.z[:n] = signal.lfilter(
self.b, self.a, data, axis=1, zi=self.z[:n]
)
else:
d, self.z[:n] = signal.lfilter(
self.b, self.a, data, axis=1, zi=self.z[:n]
)
return d
[docs]
@classmethod
def butter_highpass(cls, cutoff, fs, order=5):
"""
Creates an highpass butterworth FIR filter
Args
----
cutoff : float
Cutoff freq (Hz)
fs : float
sample frequency (Hz)
order : int
Order of the filter
"""
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = signal.butter(order, normal_cutoff, btype='high', analog=False)
return cls(b, a)
[docs]
@classmethod
def butter_lowpass(cls, cutoff, fs, order=5):
"""
Creates an lowpass butterworth FIR filter
Args
----
cutoff : float
Cutoff freq (Hz)
fs : float
sample frequency (Hz)
order : int
Order of the filter
"""
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = signal.butter(order, normal_cutoff, btype='low', analog=False)
return cls(b, a)
[docs]
@classmethod
def moving_avg(cls, width):
"""
Creates a moving avg filter
Args
-----
width : int
number of samples to average together
"""
b = 1. / width * np.ones(width, dtype=float)
a = np.zeros_like(b)
a[0] = 1
return cls(b, a)
[docs]
@classmethod
def differ(cls, delay=1):
"""
FIR filter to calculate a rolling diff of incoming data.
Args
----
delay : int
Sets the diff spacing
"""
b = np.zeros(1 + int(delay))
a = np.zeros_like(b)
b[0], b[-1] = 1, -1
a[0] = 1
return cls(b, a)
[docs]
class Demodulator:
"""
Helper class for demodulating a live timestream
Args
-----
f : float
Demodulation frequency
bw : float
Bandwidth. This will be the filter-cutoff of the applied lowpass filter
fs : float
Sample rate of incoming data
"""
def __init__(self, f, bw=1, fs=200):
self.f = f
self.lp_sin = FIRFilter.butter_lowpass(bw, fs)
self.lp_cos = FIRFilter.butter_lowpass(bw, fs)
[docs]
def apply(self, times, data):
"""
Applies demodulation to data segment.
Returns
--------
demod : np.ndarray
Array of (unnormalized) demodulated data in the same shape as the
input data
"""
sin = np.sin(2 * np.pi * self.f * times)
cos = np.cos(2 * np.pi * self.f * times)
# We don't really care about normalization
demod_sin = self.lp_sin.lfilt(data * sin[None, :], in_place=False)
demod_cos = self.lp_cos.lfilt(data * cos[None, :], in_place=False)
return np.sqrt(demod_sin**2 + demod_cos**2)
[docs]
class WhiteNoiseCalculator:
"""
Helper class for calculating white noise levels of incoming data
Args
-----
fs : float
Sample rate of incoming data
navg : int
Number of samples to average over in the RMS calc
"""
def __init__(self, fs=200, navg=200):
# Aiming for 20 Hz
delay = fs // 20
self.differ = FIRFilter.differ(delay=delay)
self.averager = FIRFilter.moving_avg(navg)
self.fsamp = fs
[docs]
def apply(self, data):
"""
Returns rms / sqrt(fsamp), which estimates the white noise level
"""
return np.sqrt(
self.averager.lfilt(
self.differ.lfilt(data, in_place=False)**2, in_place=False
) / self.fsamp
)
[docs]
class VisElem:
"""
Container for config info for Lyrebird visual elements.
Attributes
--------------
name : str
Name of the channel
x : float
x-coord of the element
y : float
y-coord of the element
rot : float
Rotation angle of the element (rads)
vals : List[str]
List containing the names of the data values corresponding to this
visual element. All vis-elems must have the same number of
data-values. Data-values must be unique, such as
``<channel_name>/rms``
eqs : List[str]
List of equations to be displayed by the visual element. Equations
are written in Polish Notation, and may contain a combination of
numbers and data-values. Data-values can be global as defined in
the lyrebird config file, or channel values as defined in the
``value_names`` argument. There must be the same number of eqs per
visual element.
Polish notation is a method of writing equations where operators
precede the operands, making it easier to parse and evaluate. For
example, the operation :math:`a + b` will be ``+ a b`` in polish
notation, and :math:`(a + b) / 2` can be written as ``/ + a b 2``.
See the magpie docs page for a full list of operators that are
accepted by lyrebird.
eq_labels : List[str]
List of strings used to label equations in the lyrebird gui. This
list must have the same size as the ``eqs`` array.
cmaps : List[str]
List of colormaps to use for each equation. This must be the same
size as the ``eqs`` array.
template : str
Template used for this vis-elem. Templates are defined in the
lyrebird cfg file.
abs_smurf_chan : int
Absolute smurf-channel corresponding to this visual element.
eq_color_is_dynamic : List[bool]
List of booleans that determine if each equation's color-scale is
dynamic or fixed. This must be the same size as the ``eqs`` array.
"""
def __init__(self, name, x, y, rot, template, abs_smurf_chan, cmap_idx=0):
self.x = x
self.y = y
self.rot = rot
self.template = template
self.abs_smurf_chan = abs_smurf_chan
self.name = name
vals = [
'{name}/raw', '{name}/demod', '{name}/wl', '{name}/flagged',
'{name}/smurf_band', '{name}/smurf_chan'
]
self.vals = [v.format(name=name) for v in vals]
eq_templates = {
'raw': '{name}/raw',
'demod': '* {name}/demod rms_scale',
'wl': '* {name}/wl wl_scale',
'flagged': '{name}/flagged',
'smurf_band': f'{self.abs_smurf_chan // CHANS_PER_BAND}',
'smurf_chan': f'{self.abs_smurf_chan % 512}',
}
self.eqs = [
eq.format(name=name) for eq in eq_templates.values()
]
self.eq_labels = [k for k in eq_templates.keys()]
self.color_is_dynamic = [False for _ in self.eqs]
self.color_is_dynamic[0] = True
self.cmaps = [
['red_cmap' for _ in eq_templates],
['blue_cmap' for _ in eq_templates]
][cmap_idx]
[docs]
class FocalplaneConfig:
def __init__(self):
"""
Object to configure the focal-plane layout.
Attributes
-------------
chan_mask : np.ndarray
Map from absolute_smurf_chan --> Visual Element index
channels : list
List of visual elements
"""
self.channels = []
self.chan_mask = np.full(MAX_CHANS, -1)
[docs]
def config_frame(self):
"""
Generates a config frame for lyrebird
"""
xs = []
ys = []
rots = []
cnames = []
templates = []
value_names = []
eqs = []
eq_labels = []
eq_color_is_dynamic = []
cmaps = []
for c in self.channels:
xs.append(c.x)
ys.append(c.y)
rots.append(c.rot)
cnames.append(c.name)
templates.append(c.template)
value_names.extend(c.vals)
eqs.extend(c.eqs)
eq_labels.extend(c.eq_labels)
eq_color_is_dynamic.extend(c.color_is_dynamic)
cmaps.extend(c.cmaps)
frame = core.G3Frame(core.G3FrameType.Wiring)
frame['x'] = core.G3VectorDouble(xs)
frame['y'] = core.G3VectorDouble(ys)
frame['cname'] = core.G3VectorString(cnames)
frame['rotation'] = core.G3VectorDouble(rots)
frame['templates'] = core.G3VectorString(templates)
frame['values'] = core.G3VectorString(value_names)
frame['color_is_dynamic'] = core.G3VectorBool(eq_color_is_dynamic)
frame['equations'] = core.G3VectorString(eqs)
frame['eq_labels'] = core.G3VectorString(eq_labels)
frame['cmaps'] = core.G3VectorString(cmaps)
return frame
[docs]
def add_vis_elem(self, *args, **kwargs):
"""
Adds a visual element to the focal-plane and updates the channel mask
"""
c = VisElem(*args, **kwargs)
self.channels.append(c)
self.chan_mask[c.abs_smurf_chan] = len(self.channels) - 1
[docs]
@classmethod
def grid(cls, stream_id, xdim, ydim, ygap=0, offset=(0., 0.)):
"""
Creates a FocalplaneConfig object for a grid of channels.
Args
----
stream_id : str
Stream-id for the magpie agent. This will be prepended to all
lyrebird data-val names.
xdim : int
Number of channels in the x-dim of the grid
ydim : int
Number of channels in the y-dim of the grid
ygap : int
A small gap will be added every ``ygap`` rows, to better organize
channels.
offset : Tuple(float, float)
Global offset of the grid with respect to the lyrebird
coordinate-system
"""
fp = cls()
xs, ys = np.arange(xdim), np.arange(ydim)
xs = xs + offset[0]
ys = ys + offset[1]
# Adds gaps every ygap rows
if ygap > 0:
ys = ys + .5 * (ys // ygap)
template = 'box'
for i in range(xdim * ydim):
x, y = xs[i % xdim], ys[i // xdim]
name = f"{stream_id}/channel_{i}"
fp.add_vis_elem(name, x, y, 0, template, i)
return fp
[docs]
@classmethod
def from_csv(cls, stream_id, detmap_file, wafer_scale=1., offset=(0, 0), rotation=0.):
"""
Creates a FocalplaneConfig object from a detmap csv file.
Args
-----
stream_id : str
Stream-id for the magpie agent. This will be prepended to all
lyrebird data-val names.
detmap_file : str
Path to detmap csv file.
wafer_scale : int
Scalar to multiply against det x and y positions when translating
to lyrebird positions. Defaults to 1, meaning that the lyrebird
coordinate system will be the same as the det-map coordinate system,
so x and y will be in um.
offset : Tuple(float, float)
Global offset of the grid with respect to the lyrebird
coordinate-system. If wafer_scale is 1, this should be in um.
"""
import pandas as pd
fp = cls()
df = pd.read_csv(detmap_file)
templates = ["template_c0_p0", "template_c1_p0", ]
color_idxs = {}
ncolors = 0
for i, row in df.iterrows():
rot = 0
try:
bandpass = int(row['bandpass'])
if bandpass in color_idxs:
cidx = color_idxs[bandpass]
else:
color_idxs[bandpass] = ncolors
ncolors += 1
cidx = color_idxs[bandpass]
template = templates[cidx]
if row['pol'].strip() == 'B':
rot = np.pi / 2
except ValueError:
# Just skip detectors with unknown bandpass
continue
x0, y0 = row['det_x'] * wafer_scale, row['det_y'] * wafer_scale
x = x0 * np.cos(rotation) - y0 * np.sin(rotation) + offset[0]
y = x0 * np.sin(rotation) + y0 * np.cos(rotation) + offset[1]
band, chan = row['smurf_band'], row['smurf_channel']
if chan == -1:
continue
abs_smurf_chan = band * CHANS_PER_BAND + chan
name = f"{stream_id}/det_{i}"
fp.add_vis_elem(name, x, y, rot, template, abs_smurf_chan, cmap_idx=cidx)
return fp
[docs]
class MagpieAgent:
"""
Agent for processing streamed G3Frames, and sending data to lyrebird.
Attributes
-----------
target_rate : float
This is the target sample rate of data to be sent to lyrebird.
Incoming data will be downsampled to this rate before being sent out.
ds_offset : int
Offset for downsample to avoid hiccups at frame boundaries
fp : FocalplaneConfig
This is the FocalplaneConfig object that contains info about what
channels are present in the focal-plane representation, and their
positions.
mask : np.ndarray
This is a channel mask that maps readout channel to
absolute-smurf-chan. Before a status frame containing the ChannelMask
is seen in the G3Stream, this defaults to being an identity mapping
which just sends the readout channel no. to itself. Once a status
frame with the channel mask is seen, this will be updated
out_queue : Queue
This is a queue containing outgoing G3Frames to be sent to lyrebird.
delay : float
The outgoing stream will attempt to enforce this delay between the
relative timestamps in the G3Frames and the real time to ensure a
smooth flow of data. This must be greater than the frame-aggregation
time of the SmurfStreamer or else lyrebird will update data in spurts.
avg1, avg2 : RollingAvg
Two Rolling Averagers which are used to calculate the rolling RMS data.
monitored_channels : list
List of monitored channels whose data should be sent to grafana.
This list will contain entries which look like
``(readout_chan_number, field_name)``.
monitored_chan_sample_rate : float
Sample rate (Hz) to target when downsampling monitored channel data for
grafana.
demod : Demodulator
Demodulator used to calculate demod signal for incoming timestreams
self.demod_freq : float
Demodulation frequency
self.demod_bandwidth : float
Filter cutoff for demodulation lowpass
wlcalc : WhiteNoiseCalculator
WhiteNoiseCalculator used to calculate white noise levels for incoming
timestreams
"""
mask_register = 'AMCc.SmurfProcessor.ChannelMapper.Mask'
def __init__(self, agent, args):
self.agent: ocs_agent.OCSAgent = agent
self.log = self.agent.log
self._running = False
self.target_rate = args.target_rate
self.ds_offset = 0
layout = args.layout.lower()
if layout == 'grid':
self.fp = FocalplaneConfig.grid(
args.stream_id, args.xdim, args.ydim, ygap=8, offset=args.offset
)
elif layout == 'wafer':
if args.det_map is not None:
self.fp = FocalplaneConfig.from_csv(
args.stream_id, args.det_map, wafer_scale=args.wafer_scale,
offset=args.offset, rotation=args.rotation
)
else:
raise ValueError("CSV file must be set using the det-map arg if "
"using wafer layout")
self.mask = np.arange(MAX_CHANS)
self.out_queue = queue.Queue(1000)
self.delay = args.delay
self.demod = None
self.demod_freq = args.demod_freq
self.demod_bandwidth = args.demod_bandwidth
self.wncalc = None
self.monitored_channels = []
self.monitored_chan_sample_rate = 10
self.agent.register_feed(
'detector_tods', record=True,
agg_params={'exclude_aggregator': True}
)
self.agent.register_feed('white_noise', record=True,)
[docs]
@ocs_agent.param('target_rate', type=float)
def set_target_rate(self, session, params):
"""set_target_rate(target_rate)
Sets the target downsampled sample-rate of the data sent to lyrebird
Args:
target_rate : float
Target sample rate for lyrebird (Hz)
"""
self.target_rate = params['target_rate']
return True, f'Set target rate to {self.target_rate}'
[docs]
@ocs_agent.param('delay', type=float)
def set_delay(self, session, params):
"""set_delay(delay)
Sets the target downsampled sample-rate of the data sent to lyrebird
Args:
target_rate : float
Target sample rate for lyrebird (Hz)
"""
self.delay = params['delay']
return True, f'Set delay param to {self.delay}'
[docs]
@ocs_agent.param('chan_info', type=list, check=lambda x: len(x) <= 6)
@ocs_agent.param('sample_rate', type=float, default=10,
check=lambda x: 0 < x <= 20)
def set_monitored_channels(self, session, params):
"""set_monitored_channels(chan_info, sample_rate=10)
**Task** - Sets channels which will be monitored and have their
downsampled data sent to grafana.
Field names can be manually specified in the chan_info list because it
may be helpful to set a consistent name for specific channels, such as
"in_transition", instead of using the autogenerated channel name, which
can change between tunes. Any additional field names used will remain
in the influx database, so be wary of programatically adding many of
them.
Args
------
chan_info : list
List of channel info corresponding to channels to monitor.
Entries of this list can be:
- ints: This will be interpreted as the readout-channel to
monitor. The field-name will be set to "r<chan_no>".
- tuples of length 2: Here the first element will be the readout
chan to monitor, and the second value will be the field name to
use for that channel.
This list can be no more than 6 elements to limit how much detector
data is saved to the HK format.
sample_rate : float
Target sample rate (Hz) to downsample detector data to. This must
be less than 20 Hz to limit how much detector data is saved to hk.
"""
monitored_chans = []
for ch_info in params['chan_info']:
if isinstance(ch_info, int):
field_name = f"r{ch_info:0>4}"
monitored_chans.append((ch_info, field_name))
elif isinstance(ch_info, (tuple, list)):
monitored_chans.append(ch_info)
else:
raise ValueError(
f"ch_info (type {type(ch_info)}) must be of type int or "
"tuple"
)
self.monitored_channels = monitored_chans
self.monitored_chan_sample_rate = params['sample_rate']
return True, "Set monitored channels"
def _process_status(self, frame):
"""
Processes a status frame. This will set or update the channel
mask whenever the smurf metadata is updated.
"""
if 'session_id' not in frame:
return
if self.mask_register in frame['status']:
status = yaml.safe_load(frame['status'])
self.mask = np.array(
ast.literal_eval(status[self.mask_register])
)
def _process_monitored_chans(self, times, data):
"""
Downsamples data for monitored channels and publishes tods to a grafana
feed.
"""
if not self.monitored_channels:
return
target_rate = self.monitored_chan_sample_rate
if len(times) <= 1:
ds_factor = 1
else:
input_rate = 1. / np.median(np.diff(times))
ds_factor = max(int(input_rate // target_rate), 1)
sl = slice(None, None, ds_factor)
times_out = times[sl]
for rc, field_name in self.monitored_channels:
if rc >= len(data):
self.log.warn(
f"Readout channel {rc} is larger than the number of"
f"streamed channels ({len(data)})! Data won't be published"
"to grafana."
)
continue
_data = {
'timestamps': times_out.tolist(),
'block_name': field_name,
'data': {
field_name: data[rc, sl].tolist()
}
}
self.agent.publish_to_feed('detector_tods', _data)
def _publish_wls(self, wls):
"""
Publishes white-noise quantiles to ocs feed
"""
quantiles = [15, 25, 50, 75, 85]
labels = [f'white_noise_q{q}' for q in quantiles]
data = {
'timestamp': time.time(),
'block_name': 'white_noise',
'data': {
k: np.quantile(wls, q / 100)
for k, q in zip(labels, quantiles)
}
}
self.agent.publish_to_feed('white_noise', data)
def _process_data(self, frame, source_offset=0):
"""
Processes a Scan frame. If lyrebird is enabled, this will return a seq
of G3Frames that are formatted for lyrebird to ingest.
"""
if 'session_id' not in frame:
return []
# Calculate downsample factor
times_in, data_in = load_frame_data(frame)
times_in = times_in - source_offset
sample_rate = 1. / np.median(np.diff(times_in))
nsamps = len(times_in)
nchans = len(data_in)
self._process_monitored_chans(times_in, data_in)
if self.demod is None:
self.demod = Demodulator(self.demod_freq, self.demod_bandwidth,
fs=sample_rate)
if self.wncalc is None:
self.wncalc = WhiteNoiseCalculator(
fs=sample_rate, navg=int(sample_rate)
)
demod = self.demod.apply(times_in, data_in)
# white noise in units of pA/rt(Hz)
wl = self.wncalc.apply(data_in * pA_per_rad)
self._publish_wls(np.median(wl, axis=1))
ds_factor = sample_rate // self.target_rate
if np.isnan(ds_factor): # There is only one element in the timestream
ds_factor = 1
ds_factor = max(int(ds_factor), 1) # Prevents downsample factors < 1
# Arrange output data structure
sample_idxs = np.arange(self.ds_offset, nsamps, ds_factor, dtype=np.int32)
self.ds_offset = ds_factor - (nsamps - self.ds_offset) % ds_factor
num_frames = len(sample_idxs)
times_out = times_in[sample_idxs]
abs_chans = self.mask[np.arange(nchans)]
nelems = len(self.fp.channels)
raw_out = np.zeros((num_frames, nelems))
demod_out = np.zeros((num_frames, nelems))
wl_out = np.zeros((num_frames, nelems))
for i, c in enumerate(abs_chans):
if c >= len(self.fp.chan_mask):
continue
idx = self.fp.chan_mask[c]
if idx >= 0:
raw_out[:, idx] = data_in[i, sample_idxs]
demod_out[:, idx] = demod[i, sample_idxs]
wl_out[:, idx] = wl[i, sample_idxs]
out = []
for i in range(num_frames):
fr = core.G3Frame(core.G3FrameType.Scan)
fr['idx'] = 0
fr['data'] = core.G3VectorDouble(raw_out[i])
fr['timestamp'] = core.G3Time(times_out[i] * core.G3Units.s)
out.append(fr)
fr = core.G3Frame(core.G3FrameType.Scan)
fr['idx'] = 1
fr['data'] = core.G3VectorDouble(demod_out[i])
fr['timestamp'] = core.G3Time(times_out[i] * core.G3Units.s)
out.append(fr)
fr = core.G3Frame(core.G3FrameType.Scan)
fr['idx'] = 2
fr['data'] = core.G3VectorDouble(wl_out[i])
fr['timestamp'] = core.G3Time(times_out[i] * core.G3Units.s)
out.append(fr)
return out
[docs]
def read(self, session, params=None):
"""read(src='tcp://localhost:4532')
**Process** - Process for reading in G3Frames from a source or list of
sources. If this source is an address that begins with ``tcp://``, the
agent will attempt to connect to a G3NetworkSender at the specified
location. The ``src`` param can also be a filepath or list of filepaths
pointing to G3Files to be streamed. If a list of filenames is passed,
once the first file is finished streaming, subsequent files will be
streamed.
"""
self._running = True
src_idx = 0
if isinstance(params['src'], str):
sources = [params['src']]
else:
sources = params['src']
reader = None
source = None
source_offset = 0
while self._running:
if reader is None:
try:
source = sources[src_idx]
source_is_file = not source.startswith('tcp://')
reader = core.G3Reader(source, timeout=5)
except RuntimeError as e:
if source_is_file:
# Raise error if file cannot be found
raise e
else:
# If not a file, log error and try again
self.log.error("G3Reader could not connect! Retrying in 10 sec.")
time.sleep(10)
continue
frames = reader.Process(None)
if not frames:
# If source is a file, start over with next file or break if
# finished all sources. If socket, just reset reader and try to
# reconnect
if source_is_file:
src_idx += 1
if src_idx >= len(sources):
self.log.info("Finished reading all sources")
break
reader = None
continue
frame = frames[0]
# If this source is a file, this will shift the timestamps so that
# data lines up with the current timestamp instead of using the
# timestamps in the file
if source_is_file and (not source_offset):
source_offset = frame['time'].time / core.G3Units.s \
- time.time()
elif not source_is_file:
source_offset = 0
if frame.type == core.G3FrameType.Wiring:
self._process_status(frame)
continue
elif frame.type == core.G3FrameType.Scan:
out = self._process_data(frame, source_offset=source_offset)
else:
continue
for f in out:
# This will block until there's a free spot in the queue.
# This is useful if the src is a file and reader.Process does
# not block
self.out_queue.put(f)
return True, "Stopped read process"
def _stop_read(self, session, params=None):
self._running = False
return True, "Stopping read process"
[docs]
def stream_fake_data(self, session, params=None):
"""stream_fake_data()
**Process** - Process for streaming fake data. This will queue up
G3Frames full of fake data to be sent to lyrebird.
"""
self._run_fake_stream = True
ndets = len(self.fp.channels)
chans = np.arange(ndets)
frame_start = time.time()
while self._run_fake_stream:
time.sleep(2)
frame_stop = time.time()
ts = np.arange(frame_start, frame_stop, 1. / self.target_rate)
frame_start = frame_stop
nframes = len(ts)
data_out = np.random.normal(0, 1, (nframes, ndets))
data_out += np.sin(2 * np.pi * ts[:, None] + .2 * chans[None, :])
for t, d in zip(ts, data_out):
fr = core.G3Frame(core.G3FrameType.Scan)
fr['idx'] = 0
fr['data'] = core.G3VectorDouble(d)
fr['timestamp'] = core.G3Time(t * core.G3Units.s)
self.out_queue.put(fr)
fr = core.G3Frame(core.G3FrameType.Scan)
fr['idx'] = 1
fr['data'] = core.G3VectorDouble(np.sin(d))
fr['timestamp'] = core.G3Time(t * core.G3Units.s)
self.out_queue.put(fr)
return True, "Stopped fake stream process"
def _stop_stream_fake_data(self, session, params=None):
self._run_fake_stream = False
return True, "Stopping fake stream process"
[docs]
@ocs_agent.param('dest', type=int)
def send(self, session, params=None):
"""send(dest)
**Process** - Process for sending outgoing G3Frames. This will query
the out_queue for frames to be sent to lyrebird. This will try to
regulate how fast it sends frames such that the delay between when the
frames are sent, and the timestamp of the frames are fixed.
"""
self._send_running = True
first_frame_time = None
stream_start_time = None
sender = core.G3NetworkSender(
hostname='*', port=params['dest'], max_queue_size=1000
)
sender.Process(self.fp.config_frame())
while session.status in ['starting', 'running']:
f = self.out_queue.get(block=True)
t = f['timestamp'].time / core.G3Units.s
now = time.time()
if first_frame_time is None:
first_frame_time = t
stream_start_time = now
this_frame_time = stream_start_time + (t - first_frame_time) + self.delay
sleep_while_running(this_frame_time - now, session)
sender.Process(f)
return True, "Stopped send process"
def _send_stop(self, session, params=None):
self._send_running = False
return True, "Stopping send process"
def make_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--src', nargs='+', default='tcp://localhost:4532',
help="Address of incoming G3Frames.")
pgroup.add_argument('--dest', type=int, default=8675,
help="Port to serve lyrebird frames")
pgroup.add_argument('--stream-id', type=str, default='none',
help="Stream-id to use to distinguish magpie streams."
"This will be prepended to data-val names in lyrebird.")
pgroup.add_argument('--target-rate', '-t', type=float, default=20,
help="Target sample rate for data being sent to Lyrebird. "
"Detector data will be downsampled to this rate.")
pgroup.add_argument(
'--delay', type=float, default=5,
help="Delay (sec) between the timestamp of a G3Frame relative to the "
"initial frame, and when the frame should be sent to lyrebird. "
"This must be larger than the frame-aggregation time for smooth "
"update times in lyrebird."
)
pgroup.add_argument('--layout', '-l', default='grid', choices=['grid', 'wafer'],
help="Focal plane layout style")
pgroup.add_argument('--xdim', type=int, default=64,
help="Number of pixels in x-dimension for grid layout")
pgroup.add_argument('--ydim', type=int, default=64,
help="Number of pixels in y-dimension for grid layout")
pgroup.add_argument('--wafer-scale', '--ws', type=float, default=50.,
help="scale of wafer coordinates")
pgroup.add_argument('--det-map', type=str, help="Path to det-map csv file")
pgroup.add_argument('--fake-data', action='store_true',
help="If set, will stream fake data instead of listening to "
"a G3stream.")
pgroup.add_argument('--offset', nargs=2, default=[0, 0], type=float,
help="Offset of detector coordinates with respect to "
"lyrebird coordinate system")
pgroup.add_argument('--rotation', default=0., type=float,
help="Rotation of wafer about its center (rad)")
pgroup.add_argument('--monitored-channels', nargs='+', type=int, default=[],
help="Readout channels to start monitoring on startup")
pgroup.add_argument('--monitored-channel-rate', type=float, default=10,
help="Target sample rate for monitored channels")
pgroup.add_argument('--demod-freq', type=float, default=8,
help="Demodulation frequency")
pgroup.add_argument('--demod-bandwidth', type=float, default=0.5,
help="Demodulation bandwidth")
return parser
def main(args=None):
txaio.use_twisted()
txaio.start_logging(level=os.environ.get("LOGLEVEL", "info"))
parser = make_parser()
args = site_config.parse_args(agent_class='MagpieAgent',
parser=parser,
args=args)
agent, runner = ocs_agent.init_site_agent(args)
magpie = MagpieAgent(agent, args)
if args.fake_data:
read_startup = False
else:
read_startup = {'src': args.src}
agent.register_process('read', magpie.read, magpie._stop_read,
startup=read_startup)
agent.register_process('stream_fake_data', magpie.stream_fake_data,
magpie._stop_stream_fake_data, startup=args.fake_data)
agent.register_process('send', magpie.send, magpie._send_stop,
startup={'dest': args.dest})
agent.register_task('set_target_rate', magpie.set_target_rate)
agent.register_task('set_delay', magpie.set_delay)
agent.register_task(
'set_monitored_channels', magpie.set_monitored_channels,
startup={'chan_info': args.monitored_channels,
'sample_rate': args.monitored_channel_rate}
)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()