Source code for socs.agents.orientalmotor_blh.agent

#!/usr/bin/env python3
'''OCS agent for BLH motor driver
'''
import argparse
import os
import time

import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock

from socs.agents.orientalmotor_blh.drivers import BLH

PORT_DEFAULT = '/dev/ttyACM0'
LOCK_RELEASE_SEC = 1.
LOCK_RELEASE_TIMEOUT = 10
ACQ_TIMEOUT = 100
INIT_TIMEOUT = 100


[docs] class BLHAgent: """OCS agent class for BLH motor driver Parameters ---------- port : string Port to connect, default to /dev/ttyACM0 """ def __init__(self, agent, port=PORT_DEFAULT): self.active = True self.agent = agent self.log = agent.log self.lock = TimeoutLock() self.take_data = False self._blh = BLH(port=port) self.initialized = False agg_params = {'frame_length': 60} self.agent.register_feed('motor', record=True, agg_params=agg_params, buffer_time=1) self.speed = 0.0
[docs] @ocs_agent.param('_') def init_blh(self, session, params): """init_blh() **Task** - Initialize motor driver. """ if self.initialized: return True, 'Already initialized' with self.lock.acquire_timeout(0, job='init_blh') as acquired: if not acquired: self.log.warn('Could not start init because ' '{} is already running'.format(self.lock.job)) return False, 'Could not acquire lock.' self._blh.connect() session.add_message('BLH initialized.') self.initialized = True return True, 'BLH module initialized.'
[docs] @ocs_agent.param('sampling_frequency', default=2.5, type=float) def acq(self, session, params): """acq() **Process** - Monitor status of the motor. Parameters ---------- sampling_frequency : float, optional Sampling frequency in Hz, defaults to 2.5 Hz Notes ----- An example of the session data:: >>> response.session['data'] {'RPM': 0.0, 'timestamp': 1736541796.779634 } """ f_sample = params['sampling_frequency'] pace_maker = Pacemaker(f_sample) if not self.initialized: self.agent.start('init_blh') for _ in range(INIT_TIMEOUT): if self.initialized: break time.sleep(0.1) if not self.initialized: return False, 'Could not initialize..' with self.lock.acquire_timeout(timeout=0, job='acq') as acquired: if not acquired: self.log.warn( f'Could not start acq because {self.lock.job} is already running') return False, 'Could not acquire lock.' self.take_data = True session.data = {} last_release = time.time() while self.take_data: # Release lock if time.time() - last_release > LOCK_RELEASE_SEC: last_release = time.time() if not self.lock.release_and_acquire(timeout=LOCK_RELEASE_TIMEOUT): print(f'Re-acquire failed: {self.lock.job}') return False, 'Could not re-acquire lock.' # Data acquisition current_time = time.time() data = {'timestamp': current_time, 'block_name': 'motor', 'data': {}} speed, error = self._blh.get_status() data['data']['RPM'] = speed data['data']['error'] = error self.speed = speed field_dict = {'RPM': speed, 'error': error} session.data.update(field_dict) self.agent.publish_to_feed('motor', data) session.data.update({'timestamp': current_time}) pace_maker.sleep() self.agent.feeds['motor'].flush_buffer() return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params): self.agent.start('stop_rotation') while self.speed > 0.1: time.sleep(1) if self.take_data: self.take_data = False return True, 'requested to stop taking data.' return False, 'acq is not currently running.'
[docs] @ocs_agent.param('speed', default=None, type=int, check=lambda x: 50 <= x <= 3000) @ocs_agent.param('accl_time', default=None, type=float, check=lambda x: 0.5 <= x <= 15) @ocs_agent.param('decl_time', default=None, type=float, check=lambda x: 0.5 <= x <= 15) def set_values(self, session, params): """set_values(speed=None, accl_time=None, decl_time=None) **Task** - Set parameters for BLH motor driver. Parameters ---------- speed : int, optional Motor rotation speed in RPM. Values must be in range [50, 3000]. accl_time : float, optional Acceleration time. Values must be in range [0.5, 15] decl_time : float, optional Deceleration time. Values must be in range [0.5, 15] """ with self.lock.acquire_timeout(3, job='set_values') as acquired: if not acquired: self.log.warn('Could not start set_values because ' f'{self.lock.job} is already running') return False, 'Could not acquire lock.' speed = params['speed'] accl_time = params['accl_time'] decl_time = params['decl_time'] if speed is not None: self._blh.set_speed(speed) if accl_time is not None: self._blh.set_accl_time(accl_time, accl=True) if decl_time is not None: self._blh.set_accl_time(decl_time, accl=False) return True, 'Set values for BLH'
[docs] @ocs_agent.param('forward', default=True, type=bool) def start_rotation(self, session, params): """start_rotation(forward=True) **Task** - Start motor rotation. Parameters ---------- forward : bool, default True Move forward if True """ if not self.take_data: self.agent.start('acq') for _ in range(ACQ_TIMEOUT): if self.take_data: break time.sleep(0.1) if not self.take_data: return False, 'Could not start acq.' with self.lock.acquire_timeout(3, job='set_values') as acquired: if not acquired: self.log.warn('Could not start set_values because ' f'{self.lock.job} is already running') return False, 'Could not acquire lock.' if not self.take_data: return False, 'acq is not currently running.' forward = params.get('forward') if forward is None: forward = True result = self._blh.start(forward=forward) if not result: return False, 'Could not start rotation.' return True, 'BLH rotation started.'
[docs] @ocs_agent.param('_') def stop_rotation(self, session, params): """stop_rotation() **Task** - Stop motor rotation. """ with self.lock.acquire_timeout(3, job='set_values') as acquired: if not acquired: self.log.warn('Could not start set_values because ' f'{self.lock.job} is already running') return False, 'Could not acquire lock.' result = self._blh.stop() if not result: return False, 'Could not stop rotation.' return True, 'BLH rotation stop command was published.'
[docs] @ocs_agent.param('_') def clear_alarm(self, session, params): """clear_alarm() **Task** - Clear alarm of the driver. """ with self.lock.acquire_timeout(3, job='set_values') as acquired: if not acquired: self.log.warn('Could not start clear_alarm because ' f'{self.lock.job} is already running') return False, 'Could not acquire lock.' self._blh.clear_alarm() _, error = self._blh.get_status() if error != 0: return False, 'Could not clear alarm.' return True, 'Alarm clear command was published.'
def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--port', default=PORT_DEFAULT) return parser def main(args=None): """Boot OCS agent""" txaio.start_logging(level=os.environ.get('LOGLEVEL', 'info')) parser = make_parser() args = site_config.parse_args(agent_class='BLHAgent', parser=parser, args=args) agent_inst, runner = ocs_agent.init_site_agent(args) blh_agent = BLHAgent(agent_inst, port=args.port) agent_inst.register_task( 'set_values', blh_agent.set_values ) agent_inst.register_task( 'start_rotation', blh_agent.start_rotation ) agent_inst.register_task( 'stop_rotation', blh_agent.stop_rotation ) agent_inst.register_task( 'init_blh', blh_agent.init_blh ) agent_inst.register_task( 'clear_alarm', blh_agent.clear_alarm ) agent_inst.register_process( 'acq', blh_agent.acq, blh_agent._stop_acq, startup=True ) runner.run(agent_inst, auto_reconnect=True) if __name__ == '__main__': main()