Source code for socs.agents.stimulator_thermometer.agent

#!/usr/bin/env python3
"""OCS agent for thermometer board in the stimulator box
"""
import argparse
import os
import time

import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock

from socs.agents.stimulator_thermometer.drivers import (Max31856,
                                                        from_spi_node_path)

LOCK_RELEASE_SEC = 1.
LOCK_RELEASE_TIMEOUT = 10


[docs] class StimThermometerAgent: """OCS agent class for stimulator thermometer. Parameters ---------- paths_spinode : list of str or pathlib.Path List of path to SPI nodes of temperature devices. """ def __init__(self, agent, paths_spinode): self.active = True self.agent = agent self.log = agent.log self.lock = TimeoutLock() self.take_data = False self.devices = [from_spi_node_path(path) for path in paths_spinode] self.initialized = False agg_params = {'frame_length': 60} self.agent.register_feed('temperatures', record=True, agg_params=agg_params, buffer_time=1)
[docs] @ocs_agent.param('sampling_frequency', type=float, default=1) def acq(self, session, params): """acq() **Process** - Fetch temperatures from the thermometer readers. Parameters ---------- sampling_frequency : float, optional Sampling frequency in Hz, default to 1 Hz. Notes ----- An example of the session data:: >>> response.session['data'] {"Channel_0": {"T":15.092116135817285}, "Channel_1": {"T":15.092116135817285}, "Channel_2": {"T":15.40625,"T_cj":16.796875}, "Channel_3": {"T":15.4921875,"T_cj":16.453125}}, ... "timestamp": 1738827824.2523127 } """ f_sample = params['sampling_frequency'] pace_maker = Pacemaker(f_sample) with self.lock.acquire_timeout(timeout=0, job='acq') as acquired: if not acquired: self.log.warn( f'Could not start acq because {self.lock.job} is already running') return False, 'Could not acquire lock.' self.take_data = True session.data = {} last_release = time.time() while self.take_data: if time.time() - last_release > LOCK_RELEASE_SEC: last_release = time.time() if not self.lock.release_and_acquire(timeout=LOCK_RELEASE_TIMEOUT): print(f'Re-acquire failed: {self.lock.job}') return False, 'Could not re-acquire lock.' current_time = time.time() for ch_num, dev in enumerate(self.devices): data = {'timestamp': current_time, 'block_name': f'temp_{ch_num}', 'data': {}} chan_string = f'Channel_{ch_num}' temp = dev.get_temp() data['data'][chan_string + '_T'] = temp if isinstance(dev, Max31856): cjtemp = dev.get_temp_ambient() data['data'][chan_string + '_T_cj'] = cjtemp field_dict = {chan_string: {'T': temp, 'T_cj': cjtemp}} else: field_dict = {chan_string: {'T': temp}} self.agent.publish_to_feed('temperatures', data) session.data.update(field_dict) session.data.update({'timestamp': current_time}) pace_maker.sleep() self.agent.feeds['temperatures'].flush_buffer() return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None): if self.take_data: self.take_data = False return True, 'requested to stop taking data.' return False, 'acq is not currently running.'
def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--paths-spinode', nargs='*', type=str, required=True, help="Path to the spi nodes.") return parser def main(args=None): """Boot OCS agent""" txaio.start_logging(level=os.environ.get('LOGLEVEL', 'info')) parser = make_parser() args = site_config.parse_args('StimThermometerAgent', parser=parser, args=args) agent_inst, runner = ocs_agent.init_site_agent(args) stim_max_agent = StimThermometerAgent(agent_inst, args.paths_spinode) agent_inst.register_process( 'acq', stim_max_agent.acq, stim_max_agent._stop_acq, startup=True ) runner.run(agent_inst, auto_reconnect=True) if __name__ == '__main__': main()