Source code for socs.agents.lakeshore240.agent

import argparse
import os
import time
import warnings
from typing import Optional

import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock

from socs.Lakeshore.Lakeshore240 import Module


[docs] class LS240_Agent: def __init__(self, agent, port="/dev/ttyUSB0", f_sample=2.5): self.agent: ocs_agent.OCSAgent = agent self.log = agent.log self.lock = TimeoutLock() self.port = port self.module: Optional[Module] = None self.f_sample = f_sample self.initialized = False self.take_data = False # Registers Temperature and Voltage feeds agg_params = { 'frame_length': 60, } self.agent.register_feed('temperatures', record=True, agg_params=agg_params, buffer_time=1) # Task functions.
[docs] def init_lakeshore(self, session, params=None): """init_lakeshore(auto_acquire=False) **Task** - Perform first time setup of the Lakeshore 240 Module. Parameters: auto_acquire (bool, optional): Starts data acquisition after initialization if True. Defaults to False. """ if params is None: params = {} auto_acquire = params.get('auto_acquire', False) if self.initialized: return True, "Already Initialized Module" with self.lock.acquire_timeout(3, job='init') as acquired: if not acquired: self.log.warn("Could not start init because " "{} is already running".format(self.lock.job)) return False, "Could not acquire lock." self.module = Module(port=self.port) print("Initialized Lakeshore module: {!s}".format(self.module)) session.add_message("Lakeshore initialized with ID: %s" % self.module.inst_sn) self.initialized = True # Start data acquisition if requested if auto_acquire: self.agent.start('acq') return True, 'Lakeshore module initialized.'
[docs] def set_values(self, session, params=None): """set_values(channel, sensor=None, auto_range=None, range=None,\ current_reversal=None, units=None, enabled=None, name=None) **Task** - Set sensor parameters for a Lakeshore240 Channel. Args: channel (int): Channel number to set. Valid choices are 1-8. sensor (int, optional): Specifies sensor type. See :func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for possible types. auto_range (int, optional): Specifies if channel should use autorange. Must be 0 or 1. range (int, optional): Specifies range if auto_range is false. Only settable for NTC RTD. See :func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for possible ranges. current_reversal (int, optional): Specifies if input current reversal is on or off. Always 0 if input is a diode. units (int, optional): Specifies preferred units parameter, and sets the units for alarm settings. See :func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for possible units. enabled (int, optional): Sets if channel is enabled. name (str, optional): Sets name of channel. """ if params is None: params = {} with self.lock.acquire_timeout(0, job='set_values') as acquired: if not acquired: self.log.warn("Could not start set_values because " "{} is already running".format(self.lock.job)) return False, "Could not acquire lock." self.module.channels[params['channel'] - 1].set_values( sensor=params.get('sensor'), auto_range=params.get('auto_range'), range=params.get('range'), current_reversal=params.get('current_reversal'), unit=params.get('unit'), enabled=params.get('enabled'), name=params.get('name'), ) return True, 'Set values for channel {}'.format(params['channel'])
[docs] @ocs_agent.param('channel', type=int, check=lambda x: 1 <= x <= 8) def get_values(self, session, params): """get_values(channel) **Task** - Get the set values for a particular channel. Parameters: channel (int): Channel to get the set sensor type, range, auto_range, current_reversal, units, and enabled values for. Valid values for channel are 1-8. The most recent data collected is stored in session data in the structure:: >>> response.session['data'] {"fields": {"sensor": "Diode", "range": 1000, "auto_range": True, "current_reversal": True, "unit": "K", "enabled": True}, "timestamp":1601925677.6914878} """ with self.lock.acquire_timeout(0, job='get_values') as acquired: if not acquired: self.log.warn(f"Could not start Task because " f"{self.lock.job} is already running") return False, "Could not acquire lock" current_values = self.module.channels[params["channel"] - 1].get_values() current_sensor = current_values['sensor'] current_range = current_values['range'] current_auto_range = current_values['auto_range'] current_current_reversal = current_values['current_reversal'] current_unit = current_values['unit'] current_enabled = current_values['enabled'] session.add_message(f'Sensor for channel {params["channel"]} is {current_sensor}') session.add_message(f'Range for channel {params["channel"]} is {current_range} Ohms') session.add_message(f'Auto Range for channel {params["channel"]} is {current_auto_range}') session.add_message(f'Current Reversal for channel {params["channel"]} is {current_current_reversal}') session.add_message(f'Unit for channel {params["channel"]} is {current_unit}') session.add_message(f'Enabled for channel {params["channel"]} is {current_enabled}') session.data = current_values return True, current_values
[docs] def upload_cal_curve(self, session, params=None): """upload_cal_curve(channel, filename) **Task** - Upload a calibration curve to a channel. Parameters: channel (int): Channel number, 1-8. filename (str): Filename for calibration curve. """ channel = params['channel'] filename = params['filename'] with self.lock.acquire_timeout(0, job='upload_cal_curve') as acquired: if not acquired: self.log.warn("Could not start set_values because " "{} is already running".format(self.lock.job)) return False, "Could not acquire lock." channel = self.module.channels[channel - 1] self.log.info("Starting upload to channel {}...".format(channel)) channel.load_curve(filename) self.log.info("Finished uploading.") return True, "Uploaded curve to channel {}".format(channel)
[docs] def acq(self, session, params=None): """acq(sampling_frequency=2.5) **Process** - Start data acquisition. Parameters: sampling_frequency (float): Sampling frequency for data collection. Defaults to 2.5 Hz The most recent data collected is stored in session data in the structure:: >>> response.session['data'] {"fields": {"Channel_1": {"T": 99.26, "V": 99.42}, "Channel_2": {"T": 99.54, "V": 101.06}, "Channel_3": {"T": 100.11, "V":100.79}, "Channel_4": {"T": 98.49, "V": 100.77}, "Channel_5": {"T": 97.75, "V": 101.45}, "Channel_6": {"T": 99.58, "V": 101.75}, "Channel_7": {"T": 98.03, "V": 100.82}, "Channel_8": {"T": 101.14, "V":101.01}}, {'connection': {'last_attempt': 1601925677.6914878, 'connected': True}} "timestamp":1601925677.6914878} """ if params is None: params = {} f_sample = params.get('sampling_frequency') # If f_sample is None, use value passed to Agent init if f_sample is None: f_sample = self.f_sample sleep_time = 1 / f_sample - 0.01 with self.lock.acquire_timeout(0, job='acq') as acquired: if not acquired: self.log.warn("Could not start acq because {} is already running" .format(self.lock.job)) return False, "Could not acquire lock." self.take_data = True session.data = {"fields": {}} while self.take_data: current_time = time.time() data = { 'timestamp': current_time, 'connection': {}, 'block_name': 'temps', 'data': {} } # Try to re-initialize if connection lost if not self.initialized: if not self.lock.release_and_acquire(timeout=10): self.log.warn(f"Could not re-acquire lock now held by {self.lock.job}.") return False self.agent.start('init_lakeshore') self.agent.wait('init_lakeshore') # Only get readings if connected if self.initialized: session.data.update({'connection': {'last_attempt': time.time(), 'connected': True}}) for chan in self.module.channels: # Read sensor on channel chan_string = "Channel_{}".format(chan.channel_num) try: temp_reading = chan.get_reading(unit='K') sensor_reading = chan.get_reading(unit='S') except Exception as e: self.log.warn('Caught unexpected {error} during read:', error=type(e).__name__) self.log.warn(' {err}', err=e) self.log.warn('No reponse. Check your connection.') self.initialized = False time.sleep(1) break # For data feed data['data'][chan_string + '_T'] = temp_reading data['data'][chan_string + '_V'] = sensor_reading # For session.data field_dict = {chan_string: {"T": temp_reading, "V": sensor_reading}} session.data['fields'].update(field_dict) session.data.update({'timestamp': current_time}) # Continue trying to connect if not self.initialized: session.data.update({'connection': {'last_attempt': time.time(), 'connected': False}}) self.log.info('Trying to reconnect.') time.sleep(1) continue self.agent.publish_to_feed('temperatures', data) time.sleep(sleep_time) self.agent.feeds['temperatures'].flush_buffer() return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None): """ Stops acq process. """ if self.take_data: self.take_data = False return True, 'requested to stop taking data.' else: return False, 'acq is not currently running'
def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--serial-number', type=str, help="Serial number of your Lakeshore240 device") pgroup.add_argument('--port', type=str, help="Path to USB node for the lakeshore") pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'], help="Starting action for the agent.") pgroup.add_argument('--sampling-frequency', type=float, help="Sampling frequency for data acquisition") return parser def main(args=None): # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = make_parser() # Not used anymore, but we don't it to break the agent if these args are passed parser.add_argument('--fake-data', help=argparse.SUPPRESS) parser.add_argument('--num-channels', help=argparse.SUPPRESS) # Interpret options in the context of site_config. args = site_config.parse_args(agent_class='Lakeshore240Agent', parser=parser, args=args) if args.fake_data is not None: warnings.warn("WARNING: the --fake-data parameter is deprecated, please " "remove from your site-config file", DeprecationWarning) if args.num_channels is not None: warnings.warn("WARNING: the --num-channels parameter is deprecated, please " "remove from your site-config file", DeprecationWarning) # Automatically acquire data if requested (default) init_params = False if args.mode == 'init': init_params = {'auto_acquire': False} elif args.mode == 'acq': init_params = {'auto_acquire': True} device_port = None if args.port is not None: device_port = args.port else: # Tries to find correct USB port automatically # This exists if udev rules are setup properly for the 240s if os.path.exists('/dev/{}'.format(args.serial_number)): device_port = "/dev/{}".format(args.serial_number) elif os.path.exists('/dev/serial/by-id'): ports = os.listdir('/dev/serial/by-id') for port in ports: if args.serial_number in port: device_port = "/dev/serial/by-id/{}".format(port) print("Found port {}".format(device_port)) break if device_port is None: print("Could not find device port for {}".format(args.serial_number)) return agent, runner = ocs_agent.init_site_agent(args) kwargs = { 'port': device_port } if args.sampling_frequency is not None: kwargs['f_sample'] = float(args.sampling_frequency) therm = LS240_Agent(agent, **kwargs) agent.register_task('init_lakeshore', therm.init_lakeshore, startup=init_params) agent.register_task('set_values', therm.set_values) agent.register_task('get_values', therm.get_values) agent.register_task('upload_cal_curve', therm.upload_cal_curve) agent.register_process('acq', therm.acq, therm._stop_acq) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()