Source code for socs.agents.holo_fpga.agent

import argparse
import os
import time

import numpy as np
import txaio
import yaml
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock

ON_RTD = os.environ.get("READTHEDOCS") == "True"
if not ON_RTD:
    import casperfpga
    from holog_daq import fpga_daq3, synth3


[docs] class FPGAAgent: """ Agent for programming FPGA and data acquisition for holography. Args: config_file (str): ocs-site-configs/uchicago/field/holog_config.yaml """ def __init__(self, agent, config_file): self.fpga = None self.initialized = False # self.take_data = False self.agent = agent self.log = agent.log self.lock = TimeoutLock() agg_params = {"frame_length": 10 * 60} # [sec] self.agent.register_feed( "fpga", record=True, agg_params=agg_params, buffer_time=0 ) # Load dictionary of specific mirror paramters, since some parameters # like limits and translate vary over different FTSes This is loaded # from a yaml file, which is assumed to be in the $OCS_CONFIG_DIR # directory. if config_file is None: raise Exception("No config file specified for the FTS mirror config") else: config_file_path = os.path.join(os.environ["OCS_CONFIG_DIR"], config_file) with open(config_file_path) as stream: self.holog_configs = yaml.safe_load(stream) if self.holog_configs is None: raise Exception("No mirror configs in config file.") self.log.info(f"Loaded mirror configs from file {config_file_path}") self.baseline = self.holog_configs.pop("baseline", None) self.roach = self.holog_configs.pop("roach", None) self.path_to_roach_init = self.holog_configs.pop("path_to_roach_init", None) self.python2_env = self.holog_configs.pop("python2_env", None) # The other mirror configs (speed, timeout) are optional and # have defaults so we leave them as the dictionary. if self.roach is None or self.baseline is None: raise Exception("IP address and channels must be included " "in the holography configuration keys.") self.roach, self.opts, self.baseline = fpga_daq3.roach2_init() err = os.system(self.python2_env + self.path_to_roach_init) assert err == 0 print("Connecting to server %s ... " % (self.roach)) self.fpga = casperfpga.CasperFpga(self.roach) time.sleep(1) if self.fpga.is_connected(): print("ok\n")
[docs] def take_data(self, session, params=None): """take_data() **Task** - A task to take data from the FPGA. Examples: Example for calling in a client:: from ocs.ocs_client import OCSClient agent_fpga = OCSClient("fpga") # create agent agent_fpga.take_data() # take data Notes: An example of the session data:: >>> response.session['data'] {"timestamp": 1601924482.722671, "block_name": "fpga", "data": {"amp_AA": 293.644, "amp_BB": 33.752, "amp_AB": 33.752, "arr_P": 33.752} } """ with self.lock.acquire_timeout(timeout=3, job="take_data") as acquired: if not acquired: self.log.warn( f"Could not set position because lock held by {self.lock.job}" ) return False, "Could not acquire lock" # Grab synthesizer settings here (which FPGA bins to integrate over) self.synth_settings = synth3.SynthOpt() self.synth_settings.IGNORE_PEAKS_BELOW = int(655) self.synth_settings.IGNORE_PEAKS_ABOVE = int(660) # Take data here arr_aa, arr_bb, arr_ab, arr_phase, arr_index = fpga_daq3.TakeAvgData( self.baseline, self.fpga, self.synth_settings ) # Data dictionary is what we will send to the data feed: data = {"timestamp": time.time(), "block_name": "fpga", "data": {}} arr_AA = np.array(fpga_daq3.running_mean(arr_aa.tolist(), 1)) arr_BB = np.array(fpga_daq3.running_mean(arr_bb.tolist(), 1)) arr_AB = np.array(fpga_daq3.running_mean(arr_ab.tolist(), 1)) arr_P = np.array(fpga_daq3.running_mean(arr_phase.tolist(), 1)) n_channels = np.size(arr_AA) amp_AA = arr_AA[int(n_channels / 2)] amp_BB = arr_BB[int(n_channels / 2)] amp_AB = np.power(arr_AB[int(n_channels / 2)], 1) amp_P = np.remainder(arr_P[int(n_channels / 2)], 360.0) data["data"]["amp_AA"] = amp_AA data["data"]["amp_BB"] = amp_BB data["data"]["amp_AB"] = amp_AB data["data"]["amp_P"] = amp_P self.agent.publish_to_feed("fpga", data) session.data.update(data["data"]) return True, "Data acquired."
def make_parser(parser=None): """Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() # Add options specific to this agent. pgroup = parser.add_argument_group("Agent Options") pgroup.add_argument("--config_file") return parser def main(args=None): # For logging txaio.use_twisted() txaio.make_logger() # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = make_parser() # Interpret options in the context of site_config. args = site_config.parse_args(agent_class="FPGAAgent", parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) fpga_agent = FPGAAgent(agent, args.config_file) # agent.register_task("init_FPGA", fpga_agent.init_FPGA) agent.register_task("take_data", fpga_agent.take_data) runner.run(agent, auto_reconnect=True) if __name__ == "__main__": main()