Source code for socs.agents.lakeshore240.agent

import argparse
import os
import time
import warnings
from typing import Optional

import txaio

from socs.Lakeshore.Lakeshore240 import Module

on_rtd = os.environ.get('READTHEDOCS') == 'True'
if not on_rtd:
    from ocs import ocs_agent, site_config
    from ocs.ocs_twisted import TimeoutLock


[docs] class LS240_Agent: def __init__(self, agent, port="/dev/ttyUSB0", f_sample=2.5): self.agent: ocs_agent.OCSAgent = agent self.log = agent.log self.lock = TimeoutLock() self.port = port self.module: Optional[Module] = None self.f_sample = f_sample self.initialized = False self.take_data = False # Registers Temperature and Voltage feeds agg_params = { 'frame_length': 60, } self.agent.register_feed('temperatures', record=True, agg_params=agg_params, buffer_time=1) # Task functions.
[docs] def init_lakeshore(self, session, params=None): """init_lakeshore(auto_acquire=False) **Task** - Perform first time setup of the Lakeshore 240 Module. Parameters: auto_acquire (bool, optional): Starts data acquisition after initialization if True. Defaults to False. """ if params is None: params = {} auto_acquire = params.get('auto_acquire', False) if self.initialized: return True, "Already Initialized Module" with self.lock.acquire_timeout(0, job='init') as acquired: if not acquired: self.log.warn("Could not start init because " "{} is already running".format(self.lock.job)) return False, "Could not acquire lock." self.module = Module(port=self.port) print("Initialized Lakeshore module: {!s}".format(self.module)) session.add_message("Lakeshore initialized with ID: %s" % self.module.inst_sn) self.initialized = True # Start data acquisition if requested if auto_acquire: self.agent.start('acq') return True, 'Lakeshore module initialized.'
[docs] def set_values(self, session, params=None): """set_values(channel, sensor=None, auto_range=None, range=None,\ current_reversal=None, units=None, enabled=None, name=None) **Task** - Set sensor parameters for a Lakeshore240 Channel. Args: channel (int): Channel number to set. Valid choices are 1-8. sensor (int, optional): Specifies sensor type. See :func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for possible types. auto_range (int, optional): Specifies if channel should use autorange. Must be 0 or 1. range (int, optional): Specifies range if auto_range is false. Only settable for NTC RTD. See :func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for possible ranges. current_reversal (int, optional): Specifies if input current reversal is on or off. Always 0 if input is a diode. units (int, optional): Specifies preferred units parameter, and sets the units for alarm settings. See :func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for possible units. enabled (int, optional): Sets if channel is enabled. name (str, optional): Sets name of channel. """ if params is None: params = {} with self.lock.acquire_timeout(0, job='set_values') as acquired: if not acquired: self.log.warn("Could not start set_values because " "{} is already running".format(self.lock.job)) return False, "Could not acquire lock." self.module.channels[params['channel'] - 1].set_values( sensor=params.get('sensor'), auto_range=params.get('auto_range'), range=params.get('range'), current_reversal=params.get('current_reversal'), unit=params.get('unit'), enabled=params.get('enabled'), name=params.get('name'), ) return True, 'Set values for channel {}'.format(params['channel'])
[docs] def upload_cal_curve(self, session, params=None): """upload_cal_curve(channel, filename) **Task** - Upload a calibration curve to a channel. Parameters: channel (int): Channel number, 1-8. filename (str): Filename for calibration curve. """ channel = params['channel'] filename = params['filename'] with self.lock.acquire_timeout(0, job='upload_cal_curve') as acquired: if not acquired: self.log.warn("Could not start set_values because " "{} is already running".format(self.lock.job)) return False, "Could not acquire lock." channel = self.module.channels[channel - 1] self.log.info("Starting upload to channel {}...".format(channel)) channel.load_curve(filename) self.log.info("Finished uploading.") return True, "Uploaded curve to channel {}".format(channel)
[docs] def acq(self, session, params=None): """acq(sampling_frequency=2.5) **Process** - Start data acquisition. Parameters: sampling_frequency (float): Sampling frequency for data collection. Defaults to 2.5 Hz The most recent data collected is stored in session data in the structure:: >>> response.session['data'] {"fields": {"Channel_1": {"T": 99.26, "V": 99.42}, "Channel_2": {"T": 99.54, "V": 101.06}, "Channel_3": {"T": 100.11, "V":100.79}, "Channel_4": {"T": 98.49, "V": 100.77}, "Channel_5": {"T": 97.75, "V": 101.45}, "Channel_6": {"T": 99.58, "V": 101.75}, "Channel_7": {"T": 98.03, "V": 100.82}, "Channel_8": {"T": 101.14, "V":101.01}}, "timestamp":1601925677.6914878} """ if params is None: params = {} f_sample = params.get('sampling_frequency') # If f_sample is None, use value passed to Agent init if f_sample is None: f_sample = self.f_sample sleep_time = 1 / f_sample - 0.01 with self.lock.acquire_timeout(0, job='acq') as acquired: if not acquired: self.log.warn("Could not start acq because {} is already running" .format(self.lock.job)) return False, "Could not acquire lock." self.take_data = True session.data = {"fields": {}} while self.take_data: current_time = time.time() data = { 'timestamp': current_time, 'block_name': 'temps', 'data': {} } for chan in self.module.channels: # Read sensor on channel chan_string = "Channel_{}".format(chan.channel_num) temp_reading = chan.get_reading(unit='K') sensor_reading = chan.get_reading(unit='S') # For data feed data['data'][chan_string + '_T'] = temp_reading data['data'][chan_string + '_V'] = sensor_reading # For session.data field_dict = {chan_string: {"T": temp_reading, "V": sensor_reading}} session.data['fields'].update(field_dict) self.agent.publish_to_feed('temperatures', data) session.data.update({'timestamp': current_time}) time.sleep(sleep_time) self.agent.feeds['temperatures'].flush_buffer() return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None): """ Stops acq process. """ if self.take_data: self.take_data = False return True, 'requested to stop taking data.' else: return False, 'acq is not currently running'
def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--serial-number', type=str, help="Serial number of your Lakeshore240 device") pgroup.add_argument('--port', type=str, help="Path to USB node for the lakeshore") pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'], help="Starting action for the agent.") pgroup.add_argument('--sampling-frequency', type=float, help="Sampling frequency for data acquisition") return parser def main(args=None): # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = make_parser() # Not used anymore, but we don't it to break the agent if these args are passed parser.add_argument('--fake-data', help=argparse.SUPPRESS) parser.add_argument('--num-channels', help=argparse.SUPPRESS) # Interpret options in the context of site_config. args = site_config.parse_args(agent_class='Lakeshore240Agent', parser=parser, args=args) if args.fake_data is not None: warnings.warn("WARNING: the --fake-data parameter is deprecated, please " "remove from your site-config file", DeprecationWarning) if args.num_channels is not None: warnings.warn("WARNING: the --num-channels parameter is deprecated, please " "remove from your site-config file", DeprecationWarning) # Automatically acquire data if requested (default) init_params = False if args.mode == 'init': init_params = {'auto_acquire': False} elif args.mode == 'acq': init_params = {'auto_acquire': True} device_port = None if args.port is not None: device_port = args.port else: # Tries to find correct USB port automatically # This exists if udev rules are setup properly for the 240s if os.path.exists('/dev/{}'.format(args.serial_number)): device_port = "/dev/{}".format(args.serial_number) elif os.path.exists('/dev/serial/by-id'): ports = os.listdir('/dev/serial/by-id') for port in ports: if args.serial_number in port: device_port = "/dev/serial/by-id/{}".format(port) print("Found port {}".format(device_port)) break if device_port is None: print("Could not find device port for {}".format(args.serial_number)) return agent, runner = ocs_agent.init_site_agent(args) kwargs = { 'port': device_port } if args.sampling_frequency is not None: kwargs['f_sample'] = float(args.sampling_frequency) therm = LS240_Agent(agent, **kwargs) agent.register_task('init_lakeshore', therm.init_lakeshore, startup=init_params) agent.register_task('set_values', therm.set_values) agent.register_task('upload_cal_curve', therm.upload_cal_curve) agent.register_process('acq', therm.acq, therm._stop_acq) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()