Source code for socs.agents.fls.agent

import argparse
import time

import numpy as np
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock

from socs.agents.fls.drivers import DLCSmart

MAX_FREQ = 880.
MIN_FREQ = 20.


def _within(val, target, tolerance=1e-2):
    return abs(val - target) <= tolerance


[docs] class FLSAgent: """ Agent for operating the lasers in the Frequency-selectable Laser Source (FLS) calibrator instrument for passband measurements. Args: ip (str): IP address for the DLC Smart laser controller port (int, optional): TCP port for DLC Smart communication. Default is 1998 """ def __init__(self, agent, ip, port=1998): self.lock = TimeoutLock() self.agent = agent self.log = agent.log self.ip = ip self.port = port self.dlcsmart = None self.initialized = False self.take_data = False self.run_sweep = False # for internal referencing self.lasers_on = False self.tx_bias_amp = None self.tx_bias_offset = None self.set_freq = None self.actual_freq = None self.scan_mode = None self.scan_min_freq = None self.scan_max_freq = None self.scan_step = None self.scan_direction = None self.integration_time = None agg_params = {'frame_length': 60} self.agent.register_feed('sampling_data', record=True, agg_params=agg_params, buffer_time=1)
[docs] @ocs_agent.param('auto_acquire', type=bool, default=False) def initialize(self, session, params=None): """ initialize(auto_acquire=False) **Task** - Initialize the connection to the DLC Smart Parameters: auto_acquire (bool): If True, start acquisition immediately after initialization. Default value is False. """ if self.initialized: return True, "Already initialized" with self.lock.acquire_timeout(0, job='initialize') as acquired: if not acquired: self.log.warn(f"Could not start initialize because {self.lock.job}" "is already running") return False, "Could not acquire lock." # Make the connection and read out the welcome message try: self.dlcsmart = DLCSmart(ip_addr=self.ip, port=self.port) self.dlcsmart.drain_buffer() except ConnectionError: self.log.error("could not establish connection to DLC Smart") return False, "FLS agent initialization failed" # Read the voltage bias and voltage offset bias_read = self.dlcsmart.check_bias() self.tx_bias_amp = bias_read[0] self.tx_bias_offset = bias_read[1] self.log.info(f'Tx bias amplitude: {self.tx_bias_amp}') self.log.info(f'Tx bias offset: + {self.tx_bias_offset}') # Read the laser emission state (on/off) lasers_on = self.dlcsmart.check_laser_emission() if "#t" in lasers_on: self.lasers_on = True self.log.info('Lasers are on.') elif "#f" in lasers_on: self.lasers_on = False self.log.info('Lasers are off.') else: print(lasers_on) self.log.warn("Could not determine if lasers are on!") # Read the actual frequency self.actual_freq = self.dlcsmart.get_actual_frequency() self.log.info(f'Actual frequency: {self.actual_freq}') # Read the scan parameters scan_params = self.dlcsmart.check_scan_params() scan_mode = scan_params[0] if "#t" in scan_mode: self.scan_mode = 'fast' elif "#f" in scan_mode: self.scan_mode = 'precise' else: self.log.warn('Could not interpret scan mode') self.scan_mode = scan_mode self.scan_min_freq = scan_params[1] self.scan_max_freq = scan_params[2] scan_step = scan_params[3] self.scan_step = abs(scan_step) self.scan_direction = np.sign(scan_step) self.initialized = True if params['auto_acquire']: resp = self.agent.start('acq', params={}) self.log.info(f'Response from acq.start(): {resp[1]}') return True, "FLS agent initialized"
[docs] @ocs_agent.param('test_mode', default=False, type=bool) def acq(self, session, params): """ acq(test_mode=False) **Process** - Starts the 'sampling' data acquisition from the DLC Smart. Parameters: test mode (bool): If True, the acquisition loop breaks after one iteration. Notes: The data collected are stored in session data in the structure:: >> response.session['data'] {'set_frequency': 110.0, 'actual_frequency': 109.3425, 'photocurrent': 0.1124, 'bias_voltage': 0.999834227, 'bias_offset': -0.498235892, 'lasers_on': True, 'scan_mode': 'fast', 'scan_min_frequency': 120.0, 'scan_max_frequency': 180.0, 'scan_step': 0.05, 'scan_direction': 1, 'integration_time': 299.3421 'timestamp': 1771277799.562098} """ with self.lock.acquire_timeout(0, job='acq') as acquired: if not acquired: self.log.warn(f"Could not start sampling because {self.lock.job}" "is already running") return False, "Could not acquire lock." last_time = time.time() self.take_data = True pm = Pacemaker(1 / 3, quantize=False) while self.take_data: pm.sleep() if time.time() - last_time > 1: last_time = time.time() if not self.lock.release_and_acquire(timeout=12): self.log.warn(f"acq: Failed to re-acquire sampling lock, " f"currently held by {self.lock.job}.") continue try: data = self.dlcsmart.sampling() if session.degraded: self.log.info("Connection re-established.") session.degraded = False except ConnectionError: self.log.error("Failed to get data from DLC Smart. Check network connection") session.degraded = True time.sleep(1) continue self.set_freq = data['set_frequency'] self.actual_freq = data['actual_frequency'] self.tx_bias_amp = data['bias_voltage'] self.tx_bias_offset = data['bias_offset'] self.scan_mode = data['scan_mode'] self.scan_min_freq = data['scan_min_frequency'] self.scan_max_freq = data['scan_max_frequency'] self.scan_step = data['scan_step'] self.scan_direction = data['scan_direction'] self.lasers_on = data['lasers_on'] self.integration_time = data['integration_time'] sampling_data = {} for key, val in data.items(): sampling_data[key] = val data['timestamp'] = time.time() session.data = data pub_data = {'timestamp': time.time(), 'block_name': 'sampling_data', 'data': sampling_data} self.agent.publish_to_feed('sampling_data', pub_data) if params['test_mode']: break self.agent.feeds['sampling_data'].flush_buffer() return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params): """ Stops sampling process. """ if self.take_data: self.take_data = False return True, 'requested to stop taking sampling data.' else: return False, 'acq is not currently running.'
[docs] @ocs_agent.param('state', type=str, choices=['on', 'off']) def toggle_laser_power(self, session, params): """ toggle_laser_power(state) **Task** - Enable or disable emission from both lasers Parameters: state (str): State ('on' or 'off') to set the lasers to """ state = params['state'] with self.lock.acquire_timeout(timeout=12, job='toggle_laser_power') as acquired: if not acquired: self.log.warn(f"Could not start Task because " f"{self.lock.job} is already running") return False, "Could not acquire lock" laser_status = self.lasers_on if laser_status: self.log.info('Current laser state is on.') on_off = 'on' elif laser_status is False: self.log.info('Current laser state is off.') on_off = 'off' if on_off == state: return True, f"Laser is already {state}" bias_amp = self.tx_bias_amp bias_offset = self.tx_bias_offset if bias_amp != 0.0 or bias_offset != 0.0: self.log.warn(f'Bias amplitude is {bias_amp} and bias offset ' f'is {bias_offset}. Setting bias to zero, then ' f'turning lasers {state}.') self.dlcsmart.set_bias_to_zero() time.sleep(0.3) bias_amp, bias_offset = self.dlcsmart.check_bias() if bias_amp != 0.0 or bias_offset != 0.0: return False, "Bias could not be set to zero so did not toggle laser power." countdown = 10 while countdown > 0: if session.status == "running": self.log.warn(f'Bias amplitude and bias offset are zero. Check that ' f'U-shaped link is unplugged. CANCEL TASK NOW IF NOT. ' f'Task will proceed in {countdown} seconds.') time.sleep(1) countdown -= 1 else: return False, "Laser power has not been toggled." self.log.info(f'Proceeding to toggle laser power {state}.') if state == 'on': self.dlcsmart.laser_emission_on() elif state == 'off': self.dlcsmart.laser_emission_off() time.sleep(0.3) laser_status = self.dlcsmart.check_laser_emission() if "#t" in laser_status: self.lasers_on = True return True, "Lasers turned on" elif "#f" in laser_status: self.lasers_on = False return True, "Lasers turned off"
def _abort_laser_power(self, session, params): if session.status == "running": session.set_status("stopping")
[docs] @ocs_agent.param('bias', type=str, choices=['default', 'zero']) def set_bias(self, session, params): """ set_bias(bias) **Task** - Set the bias amplitude and offset of the lasers to a preset condition. Parameters: bias (str): Preset condition to set the bias for the lasers. Options are 'zero' to set the bias to zero, or 'default' to set the bias to default. The default bias amplitude is 1.0, and the default bias offset is -0.5. """ bias_to_set = params['bias'] with self.lock.acquire_timeout(timeout=12, job='set_bias') as acquired: if not acquired: self.log.warn(f"Could not start Task because " f"{self.lock.job} is already running") return False, "Could not acquire lock" if bias_to_set == 'zero': self.dlcsmart.set_bias_to_zero() elif bias_to_set == 'default': self.dlcsmart.set_bias_to_default() time.sleep(3) check_bias_amp = self.tx_bias_amp check_bias_offset = self.tx_bias_offset if bias_to_set == 'zero' and (check_bias_amp, check_bias_offset) == (0., 0.): self.log.info('Bias successfully set to zero.') elif bias_to_set == 'default' and round(check_bias_amp, 1) == 1.0 and round(check_bias_offset, 1) == -0.5: self.log.info('Bias successfully set to default.') else: bias_amp, bias_offset = self.dlcsmart.check_bias() if bias_to_set == 'zero' and (bias_amp, bias_offset) == (0., 0.): self.log.info('Bias successfully set to zero.') elif bias_to_set == 'default' and round(bias_amp, 1) == 1.0 and round(bias_offset, 1) == -0.5: self.log.info('Bias successfully set to default.') else: self.log.info(f"Bias amp is {check_bias_amp} and bias offset is {check_bias_offset}.") return False, "Bias not successfully set." return True, f"Bias successfully set to {bias_to_set}."
[docs] @ocs_agent.param('integration_time', type=float) def set_integration_time(self, session, params): """ set_integration_time(integration_time) **Task** - Set the integration time of the laser system. Time is in milliseconds. Parameters: integration_time (float): The integration time in milliseconds. """ int_time = params['integration_time'] self.dlcsmart.param_set("lockin:integration-time", int_time) return True, f"Commanded integration time to be set to {int_time}."
[docs] @ocs_agent.param('frequency', type=float, check=lambda x: MIN_FREQ <= x < MAX_FREQ) def set_frequency(self, session, params): """ set_frequency(frequency) **Task** - Set the frequency of the laser system. Frequency must be between 20 GHz and 880 GHz. Parameters: frequency (float): The frequency to set the laser to. """ set_frequency = params['frequency'] with self.lock.acquire_timeout(timeout=12, job='set_frequency') as acquired: if not acquired: self.log.warn(f"Could not start Task because " f"{self.lock.job} is already running") return False, "Could not acquire lock" # Set the new frequency response = self.dlcsmart.set_frequency(set_frequency) if response == '0': return True, f"Commanded the DLC Smart to set frequency to {set_frequency}." else: return False, "Frequency set command not received by the DLC Smart."
[docs] @ocs_agent.param('min_frequency', type=float, check=lambda x: MIN_FREQ <= x < MAX_FREQ) @ocs_agent.param('max_frequency', type=float, check=lambda x: MIN_FREQ <= x < MAX_FREQ) @ocs_agent.param('start_direction', type=int, choices=[-1, 1]) @ocs_agent.param('frequency_step', type=float, default=0.05, check=lambda x: x >= 0.01) @ocs_agent.param('int_time', type=float, default=300., check=lambda x: 0.5 < x <= 3000) def run_frequency_sweeps(self, session, params): """ run_frequency_sweeps(min_frequency, max_frequency, start_direction, \ frequency_step, num_of_sweeps) **Task** - Run frequency sweeps between the two frequency values. Parameters: min_frequency (float): Minimum frequency for the sweeps (GHz). max_frequency (float): Maximum frequency for the sweeps (GHz). start_direction (int): Indicates increasing or decreasing frequency. Use start_direction = 1 for increasing frequency, or start_direction = -1 for decreasing frequency frequency_step (float): Step size between frequencies during the sweep (GHz). Must be at least 0.01 GHz. int_time (float): Integration time for each step of the sweep (ms). Default chooses the last set integration time. Note: This task only sends commands to the DLC Smart, it does not wait for the end of the frequency sweep. As a result, it returns quickly, and the user needs to call the stop separately using stop_frequency_sweep. """ min_freq = params['min_frequency'] max_freq = params['max_frequency'] start_dir = params['start_direction'] freq_step = params['frequency_step'] int_time = params['int_time'] if int_time == 0.0: int_time = self.integration_time assert min_freq < max_freq, "max_freq must be greater than min_freq!" with self.lock.acquire_timeout(timeout=12, job='set_frequency') as acquired: if not acquired: self.log.warn(f"Could not start Task because " f"{self.lock.job} is already running") return False, "Could not acquire lock" self.log.info(f'Scan called with min frequency {min_freq}, max frequency {max_freq}, ' f'start direction {start_dir}, freq step {freq_step}.') self.dlcsmart.clear_scan_data() self.log.info("Cleared stored scan data from the DLC Smart memory") if start_dir == 1 and not _within(self.actual_freq, min_freq): self.log.warn('run_frequency_sweeps called with increasing frequency, ' 'but laser is not at min_freq.') if min_freq != self.set_freq: self.log.warn(f'Set frequency is {self.set_freq} and min_freq is {min_freq}.') if start_dir == -1 and not _within(self.actual_freq, max_freq): self.log.warn('run_frequency_sweeps called with decreasing frequency, ' 'but laser is not at max_freq.') if max_freq != self.set_freq: self.log.warn(f'Set frequency is {self.set_freq} and max_freq is {max_freq}.') self.dlcsmart.set_scan_params(min_freq, max_freq, freq_step, start_dir, int_time) time.sleep(0.1) csp = self.dlcsmart.check_scan_params() fast_check = csp[0] if "#f" in fast_check: self.log.warn("Scan is not in fast mode. Attempting to set to fast mode.") self.dlcsmart.param_set("frequency:scan-mode-fast", "#t") time.sleep(0.1) csp2 = self.dlcsmart.check_scan_params() fast_check_2 = csp2[0] if "#f" in fast_check_2: self.log.warn("Scan is not in fast mode on attempt 2, so scan cannot be " "commanded via the Agent. Please set scan mode to fast in " "the GUI.") return False, "Could not start a scan because scan mode could not be set to fast." min_freq_check = csp[1] max_freq_check = csp[2] freq_step_check = abs(csp[3]) start_dir_check = np.sign(csp[3]) int_time_check = csp[4] if min_freq_check != min_freq: self.log.warn(f"Minimum frequency set to {min_freq_check}, not {min_freq}.") if max_freq_check != max_freq: self.log.warn(f"Maximum frequency set to {max_freq_check}, not {max_freq}.") if freq_step_check != freq_step: self.log.warn(f"Frequency step size set to {freq_step_check}, not {freq_step}.") if start_dir_check != start_dir: self.log.warn(f"Start direction set to {start_dir_check}, not {start_dir}.") if not _within(int_time_check, int_time, tolerance=1e-1): self.log.warn(f"Integration time set to {int_time_check}, not {int_time}.") self.dlcsmart.start_scan() return True, f"Started scan from {min_freq} GHz to {max_freq} GHz with step size {freq_step} and direction {start_dir}."
[docs] @ocs_agent.param("_") def stop_frequency_sweep(self, agent, params): """ stop_frequency_sweep() **Task** - Send a stop command to the DLC Smart to stop running a frequency sweep. This command may be run during or at the end of a sweep. """ with self.lock.acquire_timeout(timeout=12, job='set_frequency') as acquired: if not acquired: self.log.warn(f"Could not start Task because " f"{self.lock.job} is already running") return False, "Could not acquire lock" self.dlcsmart.stop_scan() return True, "Sent stop scan to the DLC Smart."
def make_parser(parser=None): """ Build the argument parser for the Agent. Allows sphinx to automatically build documenation based on this function. """ if parser is None: parser = argparse.ArgumentParser() # Add options specific to this agent pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--ip') pgroup.add_argument('--port', default=1998) pgroup.add_argument('--mode', choices=['init', 'acq']) return parser def main(args=None): parser = make_parser() args = site_config.parse_args(agent_class='FLSAgent', parser=parser, args=args) init_params = False if args.mode == 'init': init_params = {'auto_acquire': False} elif args.mode == 'acq': init_params = {'auto_acquire': True} agent, runner = ocs_agent.init_site_agent(args) fls_agent = FLSAgent(agent, args.ip, args.port) agent.register_task('initialize', fls_agent.initialize, startup=init_params) agent.register_task('toggle_laser_power', fls_agent.toggle_laser_power, aborter=fls_agent._abort_laser_power) agent.register_task('set_bias', fls_agent.set_bias) agent.register_task('set_integration_time', fls_agent.set_integration_time) agent.register_task('set_frequency', fls_agent.set_frequency) agent.register_task('run_frequency_sweeps', fls_agent.run_frequency_sweeps) agent.register_task('stop_frequency_sweep', fls_agent.stop_frequency_sweep) agent.register_process('acq', fls_agent.acq, fls_agent._stop_acq) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()