Source code for socs.agents.pfeiffer_tpg366.agent

# Original script by Zhilei Xu and Tanay Bhandarkar.

import argparse
import os
import time

import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock

from socs.agents.pfeiffer_tpg366.drivers import TPG366

# For logging
txaio.use_twisted()


[docs] class PfeifferAgent: def __init__(self, agent, ip_address, port, f_sample=1.): self.active = True self.agent = agent self.log = agent.log self.lock = TimeoutLock() self.f_sample = f_sample self.take_data = False self.gauge = TPG366(ip_address, int(port)) agg_params = {'frame_length': 60, } self.agent.register_feed('pressures', record=True, agg_params=agg_params, buffer_time=1)
[docs] @ocs_agent.param('sampling_frequency', type=float, default=2.5) @ocs_agent.param('test_mode', type=bool, default=False) def acq(self, session, params=None): """acq(sampling_frequency=2.5, test_mode=False) **Process** - Get pressures from the Pfeiffer gauges. Parameters: sampling_frequency (float): Rate at which to get the pressures [Hz]. Defaults to 2.5 Hz. test_mode (bool): Run the Process loop only once. This is meant only for testing. Defaults to False. """ f_sample = params['sampling_frequency'] if f_sample is None: f_sample = self.f_sample sleep_time = 1. / f_sample - 0.01 with self.lock.acquire_timeout(timeout=0, job='init') as acquired: # Locking mechanism stops code from proceeding if no lock acquired if not acquired: self.log.warn("Could not start init because {} is already running".format(self.lock.job)) return False, "Could not acquire lock." self.take_data = True while self.take_data: data = { 'timestamp': time.time(), 'block_name': 'pressures', 'data': {} } # Useful for debugging, but should separate to a task to cut # down on queries in the main acq() loop. # self.gauge.channel_power() try: pressure_array = self.gauge.read_pressure_all() if session.degraded: self.log.info("Connection re-established.") session.degraded = False except ConnectionError: self.log.error("Failed to get data from device. Check network connection.") session.degraded = True time.sleep(1) # wait between reconnection attempts continue # When reconnecting after a network outage it's likely we'll # get some junk that can't be processed by # self.gauge.read_pressure_all(), but doesn't show as a # ConnectionError. This will just skip that one time and resume # data collecting. except Exception as e: self.log.error(f"Caught unexpected error: {e}") session.degraded = True time.sleep(1) # wait between reconnection attempts continue # Loop through all the channels on the device for channel in range(len(pressure_array)): data['data']["pressure_ch" + str(channel + 1)] = pressure_array[channel] self.agent.publish_to_feed('pressures', data) time.sleep(sleep_time) if params['test_mode']: break self.agent.feeds['pressures'].flush_buffer() return True, 'Acquistion exited cleanly'
def _stop_acq(self, session, params=None): """ End pressure data acquisition """ if self.take_data: self.take_data = False return True, 'requested to stop taking data.' else: return False, 'acq is not currently running'
def make_parser(parser=None): """Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--ip_address') pgroup.add_argument('--port') pgroup.add_argument("--mode", type=str, default='acq', choices=['acq', 'test']) return parser def main(args=None): # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = make_parser() args = site_config.parse_args(agent_class='PfeifferAgent', parser=parser, args=args) init_params = True if args.mode == 'test': init_params = {'test_mode': True} agent, runner = ocs_agent.init_site_agent(args) pfeiffer_agent = PfeifferAgent(agent, args.ip_address, args.port) agent.register_process('acq', pfeiffer_agent.acq, pfeiffer_agent._stop_acq, startup=init_params) agent.register_task('close', pfeiffer_agent._stop_acq) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()