Source code for socs.agents.pfeiffer_tpg366.drivers

# Original script by Zhilei Xu and Tanay Bhandarkar.

import numpy as np

from socs.tcp import TCPInterface

BUFF_SIZE = 4096
ENQ = '\x05'


[docs] class TPG366(TCPInterface): """Interface class for connecting to the Pfeiffer TPG366 maxigauge controller. Parameters ---------- ip_address : str IP address of the device. port : int Associated port for TCP communication. Default is 8000. timeout : float Duration in seconds that operations wait before giving up. Default is 10 seconds. Attributes ---------- comm : socket.socket Socket object that forms the connection to the compressor. """ def __init__(self, ip_address, port=8000, timeout=10): # Setup the TCP Interface super().__init__(ip_address, port, timeout) # On boot the TPG366 starts in 'continuous transmission' mode. This # sends a single 'ENQ', which stops transmission. try: self._send_enquiry() print('Startup ENQ response:', self.recv()) except ConnectionError as e: print(f'Encountered error connecting to device: {e}') def _send_mnemonic(self, mnemonic): """Send a mnemonic. Parameters ---------- mnemonic : str Unencoded mnemonic string, with terminating characters, i.e. 'PRX\r'. Returns ------- Enocded response from the device. Typically an ACK, unless the device is in a strange state. """ self.send(mnemonic.encode()) resp = self.recv(bufsize=BUFF_SIZE) # don't decode, expecting ACK # print('Raw response:', resp) # useful debug print # if COM was on we'll get a long string, check just the last message ack = resp.splitlines()[-1].strip() if ack != b'\x06': print('WARN: Did not receive ACK. Attempting to drain buffer.') print('Buffer contents:', self.recv()) print('Resending command:', mnemonic.encode()) self._send_mnemonic(mnemonic) return resp def _send_enquiry(self): self.send(ENQ.encode())
[docs] def send_and_recv(self, message): """Send message and request transmission of queried data from device. The flow control for querying the TPG366 involves first sending a message (referred to in the TPG366 manual as a mnemonic) to set the measuring mode, receiving positive feedback from the device, then sending a request for transmission from the device, followed finally by receiving the measurement data. This method combines these four steps into one. Parameters ---------- message : str Mnemonic command code message with parameters. Returns ------- Decoded response from the device. """ self._send_mnemonic(message) self._send_enquiry() read_str = self.recv(bufsize=BUFF_SIZE).decode() return read_str
[docs] def channel_power(self): """ Check the power state of all channels. Returns ------- list List of channel states. """ msg = 'SEN\r\n' read_str = self.send_and_recv(msg) power_str = read_str.split('\r') power_states = np.array(power_str[0].split(','), dtype=int) if any(chan == 1 for chan in power_states): channel_states = [index + 1 for index, state in enumerate(power_states) if state == 1] return channel_states
[docs] def read_pressure(self, ch_no): """Measure the pressure of one given channel. Parameters ---------- ch_no : int The channel to be measured (1-6). Returns ------- float Channel pressure. """ msg = 'PR%d\r\n' % ch_no read_str = self.send_and_recv(msg) pressure_str = read_str.split(',')[-1].split('\r')[0] pressure = float(pressure_str) return pressure
[docs] def read_pressure_all(self): """Measure the pressure of all channels. Returns ------- np.array Six element array corresponding to each channels pressure reading, as floats. """ msg = 'PRX\r\n' read_str = self.send_and_recv(msg) pressure_str = read_str.split('\r')[0] gauge_states = pressure_str.split(',')[::2] gauge_states = np.array(gauge_states, dtype=int) pressures = pressure_str.split(',')[1::2] pressures = [float(p) for p in pressures] if any(state != 0 for state in gauge_states): index = np.where(gauge_states != 0) for j in index[0]: pressures[j] = 0. return pressures