Source code for socs.agents.kikusui_pcr500ma.agent

#!/usr/bin/env python3
import argparse
import os
import time

import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock

from socs.agents.kikusui_pcr500ma.drivers import PCR500MA, VOLT_ULIM_SOFT

PORT_DEFAULT = 5025

LOCK_RELEASE_SEC = 1.
LOCK_RELEASE_TIMEOUT = 10
ACQ_TIMEOUT = 5


[docs] class PCR500MAAgent: """OCS agent class for PCR500MA current source Parameters ---------- ip : string IP address port : int Port number """ def __init__(self, agent, ip_addr, port=PORT_DEFAULT): self.active = True self.agent = agent self.log = agent.log self.lock = TimeoutLock() self.take_data = False self._dev = PCR500MA(ip_addr=ip_addr, port=port) self.initialized = False agg_params = {'frame_length': 60} self.agent.register_feed('heater_source', record=True, agg_params=agg_params, buffer_time=1)
[docs] @ocs_agent.param('sampling_frequency', default=0.2, type=float) def acq(self, session, params): """acq(sampling_frequency=0.2) **Process** - Monitor status of the relay. Parameters ---------- sampling_frequency : float, optional Sampling frequency in Hz, defaults to 0.2 Hz. Notes ----- An example of the session data:: >>> response.session['data'] {"I_AC": 0.00475843, "V_AC": 0.0371215, "P_AC": 0.0, "f_AC": 60.0, "output": 0, "timestamp": 1737367649.2595236 } """ f_sample = params.get('sampling_frequency') pace_maker = Pacemaker(f_sample) with self.lock.acquire_timeout(timeout=0, job='acq') as acquired: if not acquired: self.log.warn( f'Could not start acq because {self.lock.job} is already running') return False, 'Could not acquire lock.' self.take_data = True last_release = time.time() while self.take_data: # Release lock if time.time() - last_release > LOCK_RELEASE_SEC: last_release = time.time() if not self.lock.release_and_acquire(timeout=LOCK_RELEASE_TIMEOUT): print(f'Re-acquire failed: {self.lock.job}') return False, 'Could not re-acquire lock.' # Data acquisition current_time = time.time() data = {'timestamp': current_time, 'block_name': 'heater_source', 'data': {}} try: i_ac = self._dev.meas_current_ac() v_ac = self._dev.meas_volt_ac() p_ac = self._dev.meas_power_ac() f_ac = self._dev.meas_freq() sw_status = self._dev.get_output() if session.degraded: self.log.info('Connection re-established.') session.degraded = False except ConnectionError: self.log.error('Failed to get data from Kikusui PCR. Check network connection.') session.degraded = True time.sleep(1) continue data['data']['I_AC'] = i_ac data['data']['V_AC'] = v_ac data['data']['P_AC'] = p_ac data['data']['f_AC'] = f_ac data['data']['output'] = 1 if sw_status else 0 field_dict = {'I_AC': i_ac, 'V_AC': v_ac, 'P_AC': p_ac, 'f_AC': f_ac, 'output': 1 if sw_status else 0} session.data.update(field_dict) self.agent.publish_to_feed('heater_source', data) session.data.update({'timestamp': current_time}) pace_maker.sleep() self.agent.feeds['heater_source'].flush_buffer() return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None): if self.take_data: self.take_data = False return True, 'requested to stop taking data.' return False, 'acq is not currently running.'
[docs] @ocs_agent.param('output', type=bool) @ocs_agent.param('force', default=False, type=bool) def set_output(self, session, params=None): """set_output(output, force=False) **Task** - Turns the output on or off. Parameters ---------- output : bool True : output on False : output off force : bool, default False Force output on / off without checking output value. Notes ----- This function measures the output voltage when turning off and checks the voltage setting when turning on by default. If the voltage is too high it will not change state to avoid an abrupt change in applied voltage. Set `force` option to `True` when this behavior is unwanted. """ with self.lock.acquire_timeout(ACQ_TIMEOUT, job='set_output') as acquired: if not acquired: self.log.warn('Could not start set_output because ' f'{self.lock.job} is already running') return False, 'Could not acquire lock.' output_on = params['output'] force = params['force'] # Use measurement value when checking turning off threshold. use_meas_val = not output_on if force: self._dev.set_output(output=output_on) else: if use_meas_val: volt_ac = self._dev.meas_volt_ac() if volt_ac >= 0.2: return False, 'Voltage too high to turn on/off.' else: volt_set = self._dev.get_volt_ac() if volt_set != 0: return False, 'Voltage too high to turn on/off.' self._dev.set_output(output=output_on) return True, f'Set output for PCR: {params["output"]}'
[docs] @ocs_agent.param('_') def get_output(self, session, params=None): """get_output() **Task** - Get output status. Notes ----- The most recent data collected is stored in session.data in the structure:: >>> response.session['data'] {'output': True, 'timestamp': 1737367649.2595236 } """ with self.lock.acquire_timeout(ACQ_TIMEOUT, job='get_output') as acquired: if not acquired: self.log.warn('Could not start get_output because ' f'{self.lock.job} is already running') return False, 'Could not acquire lock.' d_status = self._dev.get_output() session.data = {'output': d_status, 'timestamp': time.time()} return True, f'Got output status of PCR: {d_status}'
[docs] @ocs_agent.param('volt_set', type=float, check=lambda x: 0 <= x <= VOLT_ULIM_SOFT) def set_volt_ac(self, session, params=None): """set_volt_ac(volt_set) **Task** - Set AC voltage value. Parameters ---------- volt_set : float AC voltage setting in V. Values must be in range [0, 51]. """ with self.lock.acquire_timeout(ACQ_TIMEOUT, job='set_volt_ac') as acquired: if not acquired: self.log.warn('Could not start set_volt_ac because ' f'{self.lock.job} is already running') return False, 'Could not acquire lock.' self._dev.set_volt_ac(volt=params['volt_set']) return True, f'Set voltage for PCR: {params["volt_set"]}'
[docs] @ocs_agent.param('_') def get_volt_ac(self, session, params=None): """get_volt_ac() **Task** - Get AC voltage setting. Notes ----- The most recent data collected is stored in session.data in the structure:: >>> response.session['data'] {'volt_set': 10.0, 'timestamp': 1737367649.2595236 } """ with self.lock.acquire_timeout(ACQ_TIMEOUT, job='get_volt_ac') as acquired: if not acquired: self.log.warn('Could not start get_volt_ac because ' f'{self.lock.job} is already running') return False, 'Could not acquire lock.' volt_set = self._dev.get_volt_ac() session.data = {'volt_set': volt_set, 'timestamp': time.time()} return True, f'Got AC voltage setting: {volt_set}'
def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--port', default=PORT_DEFAULT, type=int, help='Port number for TCP communication.') pgroup.add_argument('--ip-address', help='IP address of the device.') return parser def main(args=None): """Boot OCS agent""" txaio.start_logging(level=os.environ.get('LOGLEVEL', 'info')) parser = site_config.add_arguments() parser = make_parser() args = site_config.parse_args(agent_class='PCR500MAAgent', parser=parser, args=args) agent_inst, runner = ocs_agent.init_site_agent(args) pcr_agent = PCR500MAAgent(agent_inst, ip_addr=args.ip_address, port=args.port) agent_inst.register_task( 'set_output', pcr_agent.set_output ) agent_inst.register_task( 'get_output', pcr_agent.get_output ) agent_inst.register_task( 'set_volt_ac', pcr_agent.set_volt_ac ) agent_inst.register_task( 'get_volt_ac', pcr_agent.get_volt_ac ) agent_inst.register_process( 'acq', pcr_agent.acq, pcr_agent._stop_acq, startup=True ) runner.run(agent_inst, auto_reconnect=True) if __name__ == '__main__': main()