Source code for socs.agents.smurf_file_emulator.agent

import argparse
import os
import time

import numpy as np
import so3g
import txaio
import yaml
from ocs import ocs_agent, site_config
from spt3g import core


def get_smurf_status():
    """Loads a sample status dict from file"""
    status_file = os.path.join(os.path.split(__file__)[0], 'status_sample.yaml')
    with open(status_file, 'r') as f:
        return yaml.safe_load(f)


SOSTREAM_VERSION = 2
NBIASLINES = 16
NBANDS = 8
# Range of frequencies allowed by smurf
SMURF_FREQ_RANGE = (4e3, 8e3)
SUBBANDS_PER_BAND = 512
CHANS_PER_BAND = 512

primary_names = [
    'UnixTime', 'FluxRampIncrement', 'FluxRampOffset', 'Counter0',
    'Counter1', 'Counter2', 'AveragingResetBits', 'FrameCounter',
    'TESRelaySetting'
]
primary_idxs = {name: idx for idx, name in enumerate(primary_names)}


[docs] class Tune: """ Helper class for generating tunes """ def __init__(self, nchans=1720): self.log = txaio.make_logger() self.nchans = nchans self.res_freqs = np.linspace(*SMURF_FREQ_RANGE, nchans, endpoint=False) band_width = (SMURF_FREQ_RANGE[1] - SMURF_FREQ_RANGE[0]) / NBANDS subband_width = band_width / SUBBANDS_PER_BAND rs = self.res_freqs - SMURF_FREQ_RANGE[0] self.bands = (rs / band_width).astype(int) self.subbands = ((rs / subband_width) % SUBBANDS_PER_BAND).astype(int) # just assigns channels in order for each band, making sure this # doesn't go above chans_per_band self.channels = np.full(nchans, -1, dtype=int) for b in np.unique(self.bands): m = self.bands == b self.channels[m] = np.arange(np.sum(m)) self.channels[self.channels >= CHANS_PER_BAND] = -1 self.assignment_files = [None for _ in range(NBANDS)]
[docs] def encode_band(self, band): """ Encodes band-information in the format of pysmurf tunefiles. This has the same structure as pysmurf tunefiles, but contains just enough information for indexing. """ d = { 'lock_status': {}, 'find_freq': { 'resonance': self.res_freqs, }, 'tone_power': 12, 'resonances': {} } for i, f in enumerate(self.res_freqs[self.bands == band]): d['resonances'][i] = {'freq': f} if self.assignment_files[band] is not None: d['channel_assignment'] = self.assignment_files[band] return d
[docs] def encode_tune(self): """ Encodes a full tune dictionary in the format of pysmurf tunefiles. """ return { b: self.encode_band(b) for b in np.unique(self.bands) }
[docs] def write_tune(self, basedir=''): """ Writes tune to disk. Args ---- basedir : str Directory where tune should be written. """ timestamp = int(time.time()) path = os.path.join(basedir, f'{timestamp}_tune.npy') np.save(path, self.encode_tune(), allow_pickle=True) self.tune_path = path self.log.debug(f"Writing tune: {self.tune_path}") return path
[docs] def write_channel_assignments(self, bands=None, basedir=''): """ Writes channel assignment files to disk. Args ----- bands : optional, int, list[int] Bands to write to disk. Defaults to all that are present in the tune. basedir : str Directory where files should be written """ if bands is None: bands = np.unique(self.bands) bands = np.atleast_1d(bands) timestamp = int(time.time()) for b in bands: path = os.path.join( basedir, f'{timestamp}_channel_assignment_b{b}.txt' ) m = self.bands == b d = np.array([ self.res_freqs[m], self.subbands[m], self.channels[m], np.full(np.sum(m), -1) ]).T np.savetxt(path, d, fmt='%.4f,%d,%d,%d') self.assignment_files[b] = path
[docs] class G3FrameGenerator: """ Helper class for generating G3 Streams. """ def __init__(self, stream_id, sample_rate, tune, action=None, action_time=None, quantize=True, drop_chance=0): self.frame_num = 0 self.sample_num = 0 self.session_id = int(time.time()) self.tune = tune self.nchans = np.sum(tune.channels != -1) self.sample_rate = sample_rate self.stream_id = stream_id self.action = action self.action_time = action_time self.quantize = quantize self.drop_chance = drop_chance def tag_frame(self, fr): fr['frame_num'] = self.frame_num fr['session_id'] = self.session_id fr['sostream_id'] = self.stream_id fr['sostream_version'] = SOSTREAM_VERSION fr['time'] = core.G3Time(time.time() * core.G3Units.s) self.frame_num += 1 return fr def get_obs_start_frame(self): fr = core.G3Frame(core.G3FrameType.Observation) fr['stream_placement'] = 'start' self.tag_frame(fr) return fr def get_obs_end_frame(self): fr = core.G3Frame(core.G3FrameType.Observation) fr['stream_placement'] = 'end' self.tag_frame(fr) return fr def get_status_frame(self, tag=''): fr = core.G3Frame(core.G3FrameType.Wiring) s = get_smurf_status() tune_key = 'AMCc.FpgaTopLevel.AppTop.AppCore.SysgenCryo.tuneFilePath' s[tune_key] = self.tune.tune_path tag_key = 'AMCc.SmurfProcessor.SOStream.stream_tag' s[tag_key] = tag m = self.tune.channels != -1 chmask = self.tune.channels[m] + self.tune.bands[m] * CHANS_PER_BAND s['AMCc.SmurfProcessor.ChannelMapper.Mask'] = str(chmask.tolist()) s['AMCc.SmurfProcessor.ChannelMapper.NumChannels'] = self.nchans.item() pysmurf_root = "AMCc.SmurfProcessor.SOStream" if self.action is not None: s[f'{pysmurf_root}.pysmurf_action'] = self.action if self.action_time is not None: s[f'{pysmurf_root}.pysmurf_action_timestamp'] = int(self.action_time) fr['status'] = yaml.dump(s) fr['dump'] = True self.tag_frame(fr) return fr def get_data_frame(self, start, stop): if self.quantize: # When "quantized", this clamps (start) and (stop) to integers so # timestamps created by np.arange are lined up when there's an # integer sample rate t0, t1 = int(start) - 1, int(stop) + 1 nsamp = int((t1 - t0) * self.sample_rate) times = np.linspace(t0, t1, nsamp + 1, endpoint=True) m = (start <= times) & (times < stop) times = times[m] else: times = np.arange(start, stop, 1. / self.sample_rate) nsamps = len(times) frame_counter = np.arange(self.sample_num, self.sample_num + nsamps, dtype=int) self.sample_num += nsamps chans = np.arange(self.nchans) names = [f'r{ch:0>4}' for ch in chans] count_per_phi0 = 2**16 data = np.zeros((self.nchans, nsamps), dtype=np.int32) data += count_per_phi0 * chans[:, None] data += (count_per_phi0 * 0.2 * np.sin(2 * np.pi * 8 * times)).astype(int) data += (count_per_phi0 * np.random.normal(0, 0.03, (self.nchans, nsamps))).astype(int) # Toss samples based on drop_chance m = self.drop_chance < np.random.uniform(0, 1, len(times)) times = times[m] frame_counter = frame_counter[m] data = data[:, m] nsamps = len(times) fr = core.G3Frame(core.G3FrameType.Scan) g3times = core.G3VectorTime(times * core.G3Units.s) fr['data'] = so3g.G3SuperTimestream(names, g3times, data) primary_data = np.zeros((len(primary_names), nsamps), dtype=np.int64) primary_data[primary_idxs['UnixTime'], :] = (times * 1e9).astype(int) primary_data[primary_idxs['FrameCounter'], :] = frame_counter fr['primary'] = so3g.G3SuperTimestream(primary_names, g3times, primary_data) tes_bias_names = [f'bias{bg:0>2}' for bg in range(NBIASLINES)] bias_data = np.zeros((NBIASLINES, nsamps), dtype=np.int32) fr['tes_biases'] = so3g.G3SuperTimestream(tes_bias_names, g3times, bias_data) fr['timing_paradigm'] = 'Low Precision' fr['num_samples'] = nsamps self.tag_frame(fr) return fr
[docs] class DataStreamer: """ Helper class for streaming G3 data """ def __init__(self, stream_id, sample_rate, tune, timestreamdir, file_duration, frame_len, action=None, action_time=None, drop_chance=0, tag=''): self.frame_gen = G3FrameGenerator(stream_id, sample_rate, tune, action=action, action_time=action_time, drop_chance=drop_chance) self.session_id = self.frame_gen.session_id self.stream_id = stream_id self.timestreamdir = timestreamdir self.seq = 0 self.file_duration = file_duration self.file_start = 0 self.writer = None self.file_list = [] self.frame_len = frame_len self.tag = tag self._last_stop = None def _get_g3_filename(self): """ Returns the file path for a g3-file with specified session id and seq idx. """ timecode = f"{self.session_id}"[:5] subdir = os.path.join(self.timestreamdir, timecode, self.stream_id) filepath = os.path.join(subdir, f"{self.session_id}_{self.seq:0>3}.g3") return filepath def _new_file(self): """ Ends the current G3File (if one is open) and begins a new one, incrementing ``seq`` after updating. """ self.end_file() fname = self._get_g3_filename() os.makedirs(os.path.dirname(fname), exist_ok=True) self.writer = core.G3Writer(fname) if self.seq == 0: self.writer(self.frame_gen.get_obs_start_frame()) self.writer(self.frame_gen.get_status_frame(tag=self.tag)) self.file_start = time.time() self.file_list.append(fname) self.seq += 1
[docs] def end_file(self): """ Ends the current file by sending a G3EndProcessing Frame. """ if self.writer is not None: self.writer(self.frame_gen.get_obs_end_frame()) self.writer(core.G3Frame(core.G3FrameType.EndProcessing))
[docs] def write_next(self): """ Writes the next data frame to disk. Will rotate files based on the current file start time and the file duration. This sleep wait for the frame-duration before writing the G3Frame to disk. """ if self._last_stop is None: start = time.time() else: start = self._last_stop if (start - self.file_start > self.file_duration) or (self.writer is None): self._new_file() time.sleep(self.frame_len) stop = time.time() self._last_stop = stop self.writer(self.frame_gen.get_data_frame(start, stop))
[docs] def stream_between(self, start, stop, wait=False): """ This function will create a new observation and "stream" data between a specified start and stop time. This function will by default generate and write the data without sleeping for the specified amount of time. To avoid confusion, this will not rotate G3Files since that gets kind of complicated when you're not running in real time. Args ------ start : float Start time of data stop : float Stop time of data wait : bool If True, will sleep for the correct amount of time between each written frame. Defaults to False. """ frame_starts = np.arange(start, stop, self.frame_len) frame_stops = frame_starts + self.frame_len # In case there's already an open file self.seq = 0 self.end_file() self._new_file() for t0, t1 in zip(frame_starts, frame_stops): if wait: now = time.time() if now < t1: time.sleep(t1 - now) self.writer(self.frame_gen.get_data_frame(t0, t1)) self.end_file()
[docs] class SmurfFileEmulator: """ OCS Agent for emulating file creation for the smurf system. """ def __init__(self, agent, args): self.log = agent.log self.file_duration = args.file_duration self.stream_id = args.stream_id self.basedir = args.base_dir if not os.path.exists(self.basedir): raise ValueError(f"Basedir {self.basedir} does not exist") self.smurfdir = os.path.join(self.basedir, 'smurf') self.timestreamdir = os.path.join(self.basedir, 'timestreams') self.nchans = args.nchans self.sample_rate = args.sample_rate self.frame_len = args.frame_len self.drop_chance = args.drop_chance self.streaming = False self.tune = None def _new_streamer(self, action=None, action_time=None, tag=''): return DataStreamer( self.stream_id, self.sample_rate, self.tune, self.timestreamdir, self.file_duration, self.frame_len, action=action, action_time=action_time, drop_chance=self.drop_chance, tag=tag ) def _get_action_dir(self, action, action_time=None, is_plot=False): t = int(time.time()) if action_time is None: action_time = t action_time = int(action_time) timecode = f"{action_time}"[:5] dir_type = 'plots' if is_plot else 'outputs' subdir = os.path.join( self.smurfdir, timecode, self.stream_id, f'{action_time}_{action}', dir_type ) os.makedirs(subdir, exist_ok=True) return subdir def _write_smurf_file(self, name, action, action_time=None, prepend_ctime=True, is_plot=False): """ Creates a fake pysmurf ancilliary file containing just the creation time of the file. For example:: self._write_smurf_file('IV.npy', 'take_IV') will create the file:: data/smurf/16371/<stream_id>/1637182065_take_IV/1637182065_IV.npy Args: name (str): Name of the file to be created. This should not include the ctime since the ctime will be prepended. I.e. "IV.npy" will become "<ctime>_IV.npy" action (str): Pysmurf action name for determining the subdirectory this should be written to. action_time (float): Action timestamp for the file. If None, will set to the current time. """ t = int(time.time()) if action_time is None: action_time = t subdir = self._get_action_dir( action, action_time=action_time, is_plot=is_plot ) if prepend_ctime: filepath = os.path.join(subdir, f'{t}_{name}') else: filepath = os.path.join(subdir, name) self.log.info(f"Writing smurf file: {filepath}") with open(filepath, 'w') as f: f.write(f'start: {time.time()}\n') return filepath def _run_setup(self, action=None, action_time=None, session=None): """ Helper function to run setup operations, used by uxm_setup and uxm_relock. """ self.tune = Tune(nchans=self.nchans) if action is None: action = 'uxm_setup' if action_time is None: action_time = time.time() # Find Freq files = ['amp_sweep_freq.txt', 'amp_sweep_resonance.txt', 'amp_sweep_resp.txt'] for f in files: self._write_smurf_file(f, action, action_time=action_time) # Setup Notches sdir = self._get_action_dir(action, action_time=action_time) self.tune.write_channel_assignments(basedir=sdir) self.tune.write_tune(basedir=sdir) # tracking setup action_time = time.time() fname = f"{int(time.time())}.dat" self._write_smurf_file(fname, action, action_time=action_time, prepend_ctime=False) # Short g3 stream streamer = self._new_streamer(action=action, action_time=action_time, tag='oper,noise') now = time.time() streamer.stream_between(now, now + 30, wait=False) if session is not None: session.data['noise_file'] = streamer.file_list[0]
[docs] @ocs_agent.param('sleep', default=True) def uxm_setup(self, session, params): """uxm_setup(sleep=True) **Task** - Emulates files that might come from a general tune dets function. These are some of the files found on simons1 registered when running the following ops with a single band: 1. Find-freq 2. setup_notches 3. tracking_setup 4. short g3 stream Parameters: sleep (bool, optional): If True, will sleep for 1 sec after creating the tunefile, which is required for preventing filename collisions in end-to-end testing. """ self._run_setup(action='uxm_setup', action_time=time.time(), session=session) if params.get('sleep', True): time.sleep(1) return True, "Wrote tune files"
[docs] @ocs_agent.param('tag', default=None) def take_noise(self, session, params=None): """take_noise(tag=None) **Task** - Takes a short noise timestream Parameters: tag (str, optional): User tag to add to the g3 stream. """ action = 'take_noise' action_time = time.time() tag = 'oper,noise' if params.get('tag') is not None: tag += f',{params["tag"]}' streamer = self._new_streamer(action=action, action_time=action_time, tag=tag) now = time.time() streamer.stream_between(now, now + 30, wait=False) session.data['noise_file'] = streamer.file_list[0] time.sleep(1) return True, "Took noise data"
[docs] def uxm_relock(self, session, params=None): """uxm_relock() **Task** - Normally this wouldn't involve a full find-freq, but for emulation purposes it's ok if this is the same as uxm_setup. """ self._run_setup(action='uxm_relock', action_time=time.time(), session=session) time.sleep(1) return True, "Wrote tune files"
[docs] @ocs_agent.param('wait', default=True) @ocs_agent.param('kwargs', default=None) @ocs_agent.param('tag', default=None) def take_iv(self, session, params=None): """take_iv(wait=True, tag=None) **Task** - Creates files generated associated with iv taking / analysis Parameters: wait (bool, optional): If true, will wait for the 5 seconds where fake IV data is generated kwargs : dict Additional kwargs to pass to the ``take_iv`` function. Ignored in the emulator. tag (str, optional): User tag to add to the g3 stream. """ action = 'take_iv' action_time = time.time() files = ['iv_analyze.npy', 'iv_bias_all.npy', 'iv_info.npy'] tag = 'oper,iv' if params.get('tag') is not None: tag += f',{params["tag"]}' streamer = self._new_streamer(action=action, action_time=action_time, tag=tag) now = time.time() streamer.stream_between(now, now + 5, wait=params['wait']) for f in files: self._write_smurf_file(f, action, action_time=action_time) return True, "Wrote IV files"
[docs] @ocs_agent.param('wait', default=True) @ocs_agent.param('tag', default=None) def take_bias_steps(self, session, params=None): """take_bias_steps(wait=True, tag=None) **Task** - Creates files associated with taking bias steps Parameters: wait (bool, optional): If true, will wait for the 5 seconds where fake data is generated tag (str, optional): User tag to add to the g3 stream. """ action = 'take_bias_steps' action_time = time.time() files = ['bias_step_analysis.npy'] tag = 'oper,bias_steps' if params.get('tag') is not None: tag += f',{params["tag"]}' streamer = self._new_streamer(action=action, action_time=action_time, tag=tag) now = time.time() streamer.stream_between(now, now + 5, wait=params['wait']) for f in files: self._write_smurf_file(f, action, action_time=action_time) return True, "Wrote Bias Step Files"
[docs] @ocs_agent.param('wait', default=True) @ocs_agent.param('tag', default=None) def take_bgmap(self, session, params=None): """take_bgmap(wait=True, tag=None) **Task** - Creates files associated with taking a bias group mapping. Parameters: wait (bool, optional): If true, will wait for the 5 seconds where fake data is generated tag (str, optional): User tag to add to the g3 stream. """ action = 'take_bgmap' action_time = time.time() tag = 'oper,bgmap' if params.get('tag') is not None: tag += f',{params["tag"]}' streamer = self._new_streamer(action=action, action_time=action_time, tag=tag) now = time.time() streamer.stream_between(now, now + 5, wait=params['wait']) files = ['bg_map.npy', 'bias_step_analysis.npy'] for f in files: self._write_smurf_file(f, action, action_time=action_time) return True, "Finished taking bgmap"
[docs] def bias_dets(self, session, params=None): """bias_dets() **Task** - Creates files associated with biasing dets, which is none. """ time.sleep(1) return True, 'Wrote det biasing files'
[docs] @ocs_agent.param('duration', default=None) @ocs_agent.param('kwargs', default=None) @ocs_agent.param('use_stream_between', default=False, type=bool) @ocs_agent.param('start_offset', default=0, type=float) @ocs_agent.param('subtype', default=None) @ocs_agent.param('tag', default=None) def stream(self, session, params): """stream(duration=None, use_stream_between=False, start_offset=0, tag=None) **Process** - Generates example fake-files organized in the same way as they would be a regular smurf-stream. For end-to-end testing, we want an example of a pysmurf-ancilliary file, and then regular g3 that rotate at regular intervals. The content of the files here don't match what actual G3 or pysmurf files look like, however the directory structure is the same. Parameters: duration (float, optional): If set, will stop stream after specified amount of time (sec). kwargs : dict A dictionary containing additional keyword arguments to pass. Ignored by the emulator. use_stream_between (bool, optional): If True, will use the DataStreamer's `stream_between` function instead of writing frames one at a time. This allows you to write an entire timestream for the specified duration without waiting. start_offset (float, optional): If set, this will add an offset to the start time passed to the `stream_between` function, allowing you to create offsets between streams taken at the same time. subtype : string, optional Operation subtype used to tag the stream. Ignored by the emulator. tag (str, optional): User tag to add to the g3 stream. """ if self.tune is None: raise ValueError("No tune loaded!") # Write initial smurf metadata if 'duration' in params: action = 'take_g3_data' else: action = 'stream_g3_on' action_time = time.time() files = ['freq.txt', 'mask.txt'] for f in files: self._write_smurf_file(f, action, action_time=action_time) start_time = time.time() + params['start_offset'] end_time = None if params.get('duration') is not None: end_time = start_time + params['duration'] tag = 'obs,cmb' if params.get('tag') is not None: tag += f',{params["tag"]}' streamer = self._new_streamer(action=action, action_time=action_time, tag='obs,cmb') session.data['session_id'] = streamer.session_id session.data['g3_files'] = streamer.file_list if params['use_stream_between']: streamer.stream_between(start_time, end_time) return True, "Finished Stream" self.streaming = True while self.streaming: streamer.write_next() if end_time is not None: if time.time() > end_time: break streamer.end_file() time.sleep(1) return True, "Finished Stream"
def _stop_stream(self, session, params=None): if self.streaming: session.set_status('stopping') self.streaming = False return True, 'requesting to stop taking data' else: return False, 'agent is not currently streaming'
def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--stream-id', required=True, help='Stream ID for fake smurf stream') pgroup.add_argument('--base-dir', required=True, help="Base directory where data should be written") pgroup.add_argument('--file-duration', default=10 * 60, type=float, help="Time in sec before rotating g3 files") pgroup.add_argument('--nchans', default=1024, type=int, help="Number of channels to stream from") pgroup.add_argument('--sample-rate', default=200, type=float, help="Sample rate for streaming data") pgroup.add_argument('--frame-len', default=2, type=float, help="Time per G3 data frame (seconds)") pgroup.add_argument('--drop-chance', default=0, type=float, help="Fractional chance to drop samples") return parser def main(args=None): parser = make_parser() args = site_config.parse_args(agent_class='SmurfFileEmulator', parser=parser, args=args) txaio.start_logging(level=os.environ.get('LOGLEVEL', 'info')) agent, runner = ocs_agent.init_site_agent(args) file_em = SmurfFileEmulator(agent, args) agent.register_task('uxm_setup', file_em.uxm_setup) agent.register_task('uxm_relock', file_em.uxm_relock) agent.register_task('take_iv', file_em.take_iv) agent.register_task('take_bias_steps', file_em.take_bias_steps) agent.register_task('take_bgmap', file_em.take_bgmap) agent.register_task('bias_dets', file_em.bias_dets) agent.register_task('take_noise', file_em.take_noise) agent.register_process('stream', file_em.stream, file_em._stop_stream) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()