Source code for socs.agents.hwp_pcu.agent

import argparse
import time
from dataclasses import dataclass
from queue import Queue

import txaio
from twisted.internet import defer, reactor, threads

txaio.use_twisted()

from ocs import ocs_agent, site_config

import socs.agents.hwp_pcu.drivers.hwp_pcu as pcu


class Actions:
    class BaseAction:
        def __post_init__(self):
            self.deferred = defer.Deferred()
            self.log = txaio.make_logger()

    @dataclass
    class SendCommand (BaseAction):
        command: str


def process_action(action, PCU: pcu.PCU):
    """Process an action with PCU hardware"""
    if isinstance(action, Actions.SendCommand):
        PCU.send_command(action.command)
        action.log.info(f"Command: {action.command}")


[docs] class HWPPCUAgent: """Agent to phase compensation improve the CHWP motor efficiency Args: agent (ocs.ocs_agent.OCSAgent): Instantiated OCSAgent class for this agent port (str): Path to USB device in '/dev/' """ def __init__(self, agent, port): self.agent: ocs_agent.OCSAgent = agent self.log = agent.log self.port = port self.action_queue = Queue() agg_params = {'frame_length': 60} self.agent.register_feed( 'hwppcu', record=True, agg_params=agg_params)
[docs] @defer.inlineCallbacks @ocs_agent.param('command', default='off', type=str, choices=['off', 'on_1', 'on_2', 'stop']) def send_command(self, session, params): """send_command(command) **Task** - Send commands to the phase compensation unit. off: The compensation phase is zero. on_1: The compensation phase is +120 deg. on_2: The compensation phase is -120 deg. stop: Stop the HWP spin. Parameters: command (str): set the operation mode from 'off', 'on_1', 'on_2' or 'stop'. """ action = Actions.SendCommand(**params) self.action_queue.put(action) session.data = yield action.deferred return True, f"Set relays for cmd={action.command}"
def _process_actions(self, PCU: pcu.PCU): while not self.action_queue.empty(): action = self.action_queue.get() try: self.log.info(f"Running action {action}") res = process_action(action, PCU) reactor.callFromThread(action.deferred.callback, res) except Exception as e: self.log.error(f"Error processing action: {action}") reactor.callFromThread(action.deferred.errback, e) def _get_and_publish_data(self, PCU: pcu.PCU, session): now = time.time() data = {'timestamp': now, 'block_name': 'hwppcu', 'data': {}} status = PCU.get_status() data['data']['status'] = status self.agent.publish_to_feed('hwppcu', data) session.data = {'status': status, 'last_updated': now} if status in ['failed', 'undefined']: PCU.clear_buffer() self.log.warn(f'Status is {status}, cleared buffer') def _clear_queue(self): while not self.action_queue.empty(): action = self.action_queue.get() action.deferred.errback(Exception("Action cancelled"))
[docs] def main(self, session, params): """ **Process** - Main process for PCU agent. """ PCU = None threads.blockingCallFromThread(reactor, self._clear_queue) last_daq = 0 while session.status in ['starting', 'running']: if PCU is None: try: PCU = pcu.PCU(port=self.port) self.log.info('Connected to PCU') PCU.clear_buffer() self.log.info('Cleared buffer') except ConnectionRefusedError: self.log.error( "Could not connect to PCU. " "Retrying after 30 sec..." ) time.sleep(30) continue now = time.time() if now - last_daq > 5: self._get_and_publish_data(PCU, session) last_daq = now self._process_actions(PCU) time.sleep(0.1) PCU.close()
def _stop_main(self, session, params): """ Stop acq process. """ session.set_status('stopping') return True, 'Set main status to stopping'
def make_parser(parser=None): """ Build the argument parser for the Agent. Allows sphinx to automatically build documentation baised on this function """ if parser is None: parser = argparse.ArgumentParser() # Add options specific to this agent pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--port', type=str, help="Path to USB node for the PCU") return parser def main(args=None): parser = make_parser() args = site_config.parse_args(agent_class='HWPPCUAgent', parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) hwppcu_agent = HWPPCUAgent(agent, port=args.port) agent.register_task('send_command', hwppcu_agent.send_command, blocking=False) agent.register_process( 'main', hwppcu_agent.main, hwppcu_agent._stop_main, startup=True) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()