Source code for socs.agents.scpi_psu.agent

import argparse
import os
import socket
import time
from typing import Optional

import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock

from socs.agents.scpi_psu.drivers import PsuInterface, ScpiPsuInterface

# For logging
txaio.use_twisted()


[docs] class ScpiPsuAgent: def __init__(self, agent, ip_address, gpib_slot=None, port=None): self.agent = agent self.log = agent.log self.lock = TimeoutLock() self.job = None self.ip_address = ip_address self.gpib_slot = None self.port = None self.monitor = False self.psu: Optional[ScpiPsuAgent] = None if gpib_slot is not None: self.gpib_slot = int(gpib_slot) if port is not None: self.port = int(port) # Registers Temperature and Voltage feeds agg_params = { 'frame_length': 10 * 60, } self.agent.register_feed('psu_output', record=True, agg_params=agg_params, buffer_time=0)
[docs] @ocs_agent.param('auto_acquire', default=False, type=bool) def init(self, session, params=None): """init() **Task** - Initialize connection to the power supply. """ with self.lock.acquire_timeout(0) as acquired: if not acquired: return False, "Could not acquire lock" if self.gpib_slot is None and self.port is None: self.log.error('Either --gpib-slot or --port must be specified') return False, "Parameter not set" elif self.port is None: # Use the old Prologix-based GPIB code try: self.psu = PsuInterface(self.ip_address, self.gpib_slot) self.idn = self.psu.identify() except socket.timeout as e: self.log.error(f"PSU timed out during connect: {e}") return False, "Timeout" else: # Use the new direct ethernet connection code try: self.psu = ScpiPsuInterface(self.ip_address, port=self.port) self.idn = self.psu.identify() except socket.timeout as e: self.log.error(f"PSU timed out during connect: {e}") return False, "Timeout" except ValueError as e: if (e.args[0].startswith('Model number')): self.log.warn(f"PSU initialization: {e}. \ Number of channels defaults to 3. \ Suggest appending {e.args[-1]} to the list \ of known model numbers in scpi_psu/drivers.py") else: self.log.error(f"PSU initialization resulted in unknown ValueError: {e}") return False, "ValueError" self.log.info("Connected to psu: {}".format(self.idn)) auto_acquire = params.get('auto_acquire', False) if auto_acquire: acq_params = None if self.psu.num_channels != 0: acq_params = {'channels': [i + 1 for i in range(self.psu.num_channels)]} self.agent.start('monitor_output', acq_params) return True, 'Initialized PSU.'
def _initialize_module(self): """Initialize the ScpiPsu module.""" try: if self.port is None: self.psu = PsuInterface(self.ip_address, self.gpib_slot) else: self.psu = ScpiPsuInterface(self.ip_address, port=self.port) except (socket.timeout, OSError) as e: self.log.warn(f"Error establishing connection: {e}") self.psu = None return False self.idn = self.psu.identify() self.log.info("Connected to psu: {}".format(self.idn)) self.log.info("Clearing event registers and error queue") self.psu.clear() return True
[docs] @ocs_agent.param('wait', type=float, default=1) @ocs_agent.param('channels', type=list, default=[1, 2, 3]) @ocs_agent.param('test_mode', type=bool, default=False) def monitor_output(self, session, params=None): """monitor_output(wait=1, channels=[1, 2, 3], test_mode=False) **Process** - Continuously monitor PSU output current and voltage. Parameters: wait (float, optional): Time to wait between measurements [seconds]. channels (list[int], optional): Channels to monitor. [1, 2, 3] by default. test_mode (bool, optional): Exit process after single loop if True. Defaults to False. """ self.monitor = True while self.monitor: time.sleep(params['wait']) with self.lock.acquire_timeout(1) as acquired: if not acquired: self.log.warn("Could not acquire in monitor_current") continue if not self.psu: self._initialize_module() continue data = { 'timestamp': time.time(), 'block_name': 'output', 'data': {} } try: for chan in params['channels']: if self.psu.get_output(chan): data['data']["Voltage_{}".format(chan)] = self.psu.get_volt(chan) data['data']["Current_{}".format(chan)] = self.psu.get_curr(chan) else: self.log.warn("Cannot measure output when output is disabled") self.monitor = False return False, "Cannot measure output when output is disabled" except socket.timeout as e: self.log.warn(f"TimeoutError: {e}") self.log.info("Attempting to reconnect") self.psu = None continue self.agent.publish_to_feed('psu_output', data) # Allow this process to be queried to return current data session.data = data if params['test_mode']: break return True, "Finished monitoring current"
def stop_monitoring(self, session, params=None): self.monitor = False return True, "Stopping current monitor"
[docs] @ocs_agent.param('channel', type=int, choices=[1, 2, 3]) def get_voltage(self, session, params=None): """get_voltage(channel) **Task** - Measure and return the voltage of the power supply. Parameters: channel (int): Channel number (1, 2, or 3). Examples: Example for calling in a client:: client.get_voltage(channel=1) Notes: An example of the session data:: >>> response.session['data'] {'timestamp': 1723671503.4899583, 'channel': 1, 'voltage': 0.0512836} """ chan = params['channel'] with self.lock.acquire_timeout(1) as acquired: if acquired: if self.psu.get_output(chan): data = { 'timestamp': time.time(), 'channel': chan, 'voltage': self.psu.get_volt(chan) } session.data = data else: return False, "Cannot measure output when output is disabled." else: return False, "Could not acquire lock" return True, 'Channel {} voltage measured'.format(chan)
[docs] @ocs_agent.param('channel', type=int, choices=[1, 2, 3]) @ocs_agent.param('volts', type=float, check=lambda x: 0 <= x <= 30) def set_voltage(self, session, params=None): """set_voltage(channel, volts) **Task** - Set the voltage of the power supply. Parameters: channel (int): Channel number (1, 2, or 3). volts (float): Voltage to set. Must be between 0 and 30. """ with self.lock.acquire_timeout(1) as acquired: if acquired: self.psu.set_volt(params['channel'], params['volts']) else: return False, "Could not acquire lock" return True, 'Set channel {} voltage to {}'.format(params['channel'], params['volts'])
[docs] @ocs_agent.param('channel', type=int, choices=[1, 2, 3]) def get_current(self, session, params=None): """get_current(channel) **Task** - Measure and return the current of the power supply. Parameters: channel (int): Channel number (1, 2, or 3). Examples: Example for calling in a client:: client.get_current(channel=1) Notes: An example of the session data:: >>> response.session['data'] {'timestamp': 1723671503.4899583, 'channel' : 1, 'current': 0.0103236} """ chan = params['channel'] with self.lock.acquire_timeout(1) as acquired: if acquired: # Check if output is enabled before measuring if self.psu.get_output(chan): data = { 'timestamp': time.time(), 'channel': chan, 'current': self.psu.get_curr(chan) } session.data = data else: return False, "Cannot measure output when output is disabled." else: return False, "Could not acquire lock" return True, 'Channel {} current measured'.format(chan)
[docs] @ocs_agent.param('channel', type=int, choices=[1, 2, 3]) @ocs_agent.param('current', type=float) def set_current(self, session, params=None): """set_current(channel, current) **Task** - Set the current of the power supply. Parameters: channel (int): Channel number (1, 2, or 3). current (float): Current to set. """ with self.lock.acquire_timeout(1) as acquired: if acquired: self.psu.set_curr(params['channel'], params['current']) else: return False, "Could not acquire lock" return True, 'Set channel {} current to {}'.format(params['channel'], params['current'])
[docs] @ocs_agent.param('channel', type=int, choices=[1, 2, 3]) def get_output(self, session, params=None): """get_output(channel) **Task** - Check if channel ouput is enabled or disabled. Parameters: channel (int): Channel number (1, 2, or 3). Examples: Example for calling in a client:: client.get_output(channel=1) Notes: An example of the session data:: >>> response.session['data'] {'timestamp': 1723671503.4899583, 'channel': 1, 'state': 1} """ chan = params['channel'] enabled = False data = {} with self.lock.acquire_timeout(1) as acquired: if acquired: data = { 'timestamp': time.time(), 'channel': chan, 'state': self.psu.get_output(chan) } session.data = data else: return False, "Could not acquire lock." if enabled: return True, 'Channel {} output is currently enabled.'.format(chan) else: return True, 'Channel {} output is currently disabled.'.format(chan)
[docs] @ocs_agent.param('channel', type=int, choices=[1, 2, 3]) @ocs_agent.param('state', type=bool) def set_output(self, session, params=None): """set_output(channel, state) **Task** - Turn a channel on or off. Parameters: channel (int): Channel number (1, 2, or 3). state (bool): True for on, False for off. """ with self.lock.acquire_timeout(1) as acquired: if acquired: self.psu.set_output(params['channel'], params['state']) else: return False, "Could not acquire lock" return True, 'Initialized PSU.'
def make_parser(parser=None): """Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() # Add options specific to this agent. pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--ip-address') pgroup.add_argument('--gpib-slot') pgroup.add_argument('--port') pgroup.add_argument('--mode', type=str, default='acq', choices=['init', 'acq']) return parser def main(args=None): # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = make_parser() args = site_config.parse_args(agent_class='ScpiPsuAgent', parser=parser, args=args) init_params = False if args.mode == 'acq': init_params = {'auto_acquire': True} agent, runner = ocs_agent.init_site_agent(args) p = ScpiPsuAgent(agent, args.ip_address, args.gpib_slot, port=args.port) agent.register_task('init', p.init, startup=init_params) agent.register_task('set_voltage', p.set_voltage) agent.register_task('set_current', p.set_current) agent.register_task('set_output', p.set_output) agent.register_task('get_voltage', p.get_voltage) agent.register_task('get_current', p.get_current) agent.register_task('get_output', p.get_output) agent.register_process('monitor_output', p.monitor_output, p.stop_monitoring) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()