# Script to log and readout pfeiffer TPG 366 gauge contoller
# via Ethernet connection
# Zhilei Xu, Tanay Bhandarkar
import argparse
import socket
import time
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock
BUFF_SIZE = 128
ENQ = '\x05'
class Pfeiffer:
"""CLASS to control and retrieve data from the pfeiffer tpg366
pressure gauge controller
Args:
ip_address: IP address of the deivce
porti (int): 8000 (fixed for the device)
Attributes:
read_pressure reads the pressure from one channel (given as an argument)
read_pressure_all reads pressures from the six channels
close closes the socket
"""
def __init__(self, ip_address, port, timeout=10,
f_sample=2.5):
self.ip_address = ip_address
self.port = port
self.comm = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.comm.connect((self.ip_address, self.port))
self.comm.settimeout(timeout)
def read_pressure(self, ch_no):
"""
Function to measure the pressure of one given channel
ch_no is the chanel to be measured (e.g. 1-6)
returns the measured pressure as a float
Args:
ch_no: The channel to be measured (1-6)
Returns:
pressure as a float
"""
msg = 'PR%d\r\n' % ch_no
self.comm.send(msg.encode())
# Can use this to catch exemptions, for troubleshooting
self.comm.recv(BUFF_SIZE).decode()
self.comm.send(ENQ.encode())
read_str = self.comm.recv(BUFF_SIZE).decode()
pressure_str = read_str.split(',')[-1].split('\r')[0]
pressure = float(pressure_str)
return pressure
def read_pressure_all(self):
"""measure the pressure of all channel
Return an array of 6 pressure values as a float array
Args:
None
Returns:
6 element array corresponding to each channels
pressure reading, as floats
"""
msg = 'PRX\r\n'
self.comm.send(msg.encode())
# Could use this to catch exemptions, for troubleshooting
self.comm.recv(BUFF_SIZE).decode()
self.comm.send(ENQ.encode())
read_str = self.comm.recv(BUFF_SIZE).decode()
pressure_str = read_str.split('\r')[0]
# gauge_states = pressure_str.split(',')[::2]
pressures = pressure_str.split(',')[1::2]
pressures = [float(p) for p in pressures]
return pressures
def close(self):
"""Close the socket of the connection"""
self.comm.close()
[docs]
class PfeifferAgent:
def __init__(self, agent, ip_address, port, f_sample=2.5):
self.active = True
self.agent = agent
self.log = agent.log
self.lock = TimeoutLock()
self.f_sample = f_sample
self.take_data = False
self.gauge = Pfeiffer(ip_address, int(port))
agg_params = {'frame_length': 60, }
self.agent.register_feed('pressures',
record=True,
agg_params=agg_params,
buffer_time=1)
[docs]
@ocs_agent.param('sampling_frequency', type=float, default=2.5)
@ocs_agent.param('test_mode', type=bool, default=False)
def acq(self, session, params=None):
"""acq(sampling_frequency=2.5, test_mode=False)
**Process** - Get pressures from the Pfeiffer gauges.
Parameters:
sampling_frequency (float): Rate at which to get the pressures
[Hz]. Defaults to 2.5 Hz.
test_mode (bool): Run the Process loop only once. This is meant
only for testing. Defaults to False.
"""
f_sample = params['sampling_frequency']
if f_sample is None:
f_sample = self.f_sample
sleep_time = 1. / f_sample - 0.01
with self.lock.acquire_timeout(timeout=0, job='init') as acquired:
# Locking mechanism stops code from proceeding if no lock acquired
if not acquired:
self.log.warn("Could not start init because {} is already running".format(self.lock.job))
return False, "Could not acquire lock."
self.take_data = True
while self.take_data:
data = {
'timestamp': time.time(),
'block_name': 'pressures',
'data': {}
}
pressure_array = self.gauge.read_pressure_all()
# Loop through all the channels on the device
for channel in range(len(pressure_array)):
data['data']["pressure_ch" + str(channel + 1)] = pressure_array[channel]
self.agent.publish_to_feed('pressures', data)
time.sleep(sleep_time)
if params['test_mode']:
break
self.agent.feeds['pressures'].flush_buffer()
return True, 'Acquistion exited cleanly'
def _stop_acq(self, session, params=None):
"""
End pressure data acquisition
"""
if self.take_data:
self.take_data = False
self.gauge.close()
return True, 'requested to stop taking data.'
else:
return False, 'acq is not currently running'
def make_parser(parser=None):
"""Build the argument parser for the Agent. Allows sphinx to automatically
build documentation based on this function.
"""
if parser is None:
parser = argparse.ArgumentParser()
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--ip_address')
pgroup.add_argument('--port')
pgroup.add_argument("--mode", type=str, default='acq', choices=['acq', 'test'])
return parser
def main(args=None):
parser = make_parser()
args = site_config.parse_args(agent_class='PfeifferAgent',
parser=parser,
args=args)
init_params = True
if args.mode == 'test':
init_params = {'test_mode': True}
agent, runner = ocs_agent.init_site_agent(args)
pfeiffer_agent = PfeifferAgent(agent, args.ip_address, args.port)
agent.register_process('acq', pfeiffer_agent.acq,
pfeiffer_agent._stop_acq, startup=init_params)
agent.register_task('close', pfeiffer_agent._stop_acq)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()