Source code for socs.testing.device_emulator

import logging
import shutil
import socket
import subprocess
import threading
import time
import traceback as tb
from copy import deepcopy
from typing import Dict

import pytest
import serial
from serial.serialutil import SerialException


[docs] def create_device_emulator(responses, relay_type, port=9001, encoding='utf-8', reconnect=False): """Create a device emulator fixture. This provides a device emulator that can be used to mock a device during testing. Args: responses (dict): Dictionary with commands as keys, and responses as values. See :class:`.DeviceEmulator` for details. relay_type (str): Communication relay type. Either 'serial' or 'tcp'. port (int): Port for the TCP relay to listen for connections on. Defaults to 9001. Using port 0 will randomly select an available port. Only used if relay_type is 'tcp'. encoding (str): Encoding for the messages and responses. See :func:`socs.testing.device_emulator.DeviceEmulator` for more details. reconnect (bool): If True, on TCP client disconnect, the emulator will listen for new incoming connections instead of quitting Returns: function: A pytest fixture that creates a Device emulator of the specified type. """ if relay_type not in ['serial', 'tcp']: raise NotImplementedError(f"relay_type '{relay_type}' is not" + "implemented or is an invalid type") @pytest.fixture() def create_device(): device = DeviceEmulator(responses, encoding, reconnect=reconnect) if relay_type == 'serial': device.create_serial_relay() elif relay_type == 'tcp': device.create_tcp_relay(port) yield device device.shutdown() return create_device
[docs] class DeviceEmulator: """A mocked device which knows how to respond on various communication channels. Args: responses (dict): Initial responses, any response required by Agent startup, if any. encoding (str): Encoding for the messages and responses. DeviceEmulator will try to encode and decode messages with the given encoding. No encoding is used if set to None. That can be useful if you need to use raw data from your hardware. Defaults to 'utf-8'. reconnect (bool): If True, on TCP client disconnect, the emulator will listen for new incoming connections instead of quitting Attributes: responses (dict): Current set of responses the DeviceEmulator would give. Should all be strings, not bytes-like. default_response (str): Default response to send if a command is unrecognized. No response is sent and an error message is logged if a command is unrecognized and the default response is set to None. Defaults to None. encoding (str): Encoding for the messages and responses, set by the encoding argument. socket_port (int): Port that the DeviceEmulator is listening on if using the 'tcp' relay. _type (str): Relay type, either 'serial' or 'tcp'. _read (bool): Used to stop the background reading of data recieved on the relay. _conn (socket.socket): TCP connection for use in 'tcp' relay. """ def __init__(self, responses, encoding='utf-8', reconnect=False): self.responses = deepcopy(responses) self.default_response = None self.encoding = encoding self.reconnect = reconnect self._type = None self._read = True self._conn = None self.socket_port = None self.logger = logging.getLogger(self.__class__.__name__) self.logger.setLevel(logging.DEBUG) if len(self.logger.handlers) == 0: formatter = logging.Formatter("%(asctime)s - %(name)s: %(message)s") handler = logging.StreamHandler() handler.setFormatter(formatter) self.logger.addHandler(handler) @staticmethod def _setup_socat(responder_link='./responder'): """Setup a data relay with socat. The "./responder" link is the external end of the relay, which the Agent should connect to. "./internal" is used within the DeviceEmulator object to accept commands and to respond to the Agent. """ socat = shutil.which('socat') cmd = [socat, '-d', '-d', f'pty,link={responder_link},b57600', 'pty,link=./internal,b57600'] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) time.sleep(1) return proc
[docs] def create_serial_relay(self, responder_link='./responder'): """Create the serial relay, emulating a hardware device connected over serial. This first uses ``socat`` to setup a relay. It then connects to the "internal" end of the relay, ready to receive communications sent to the "responder" end of the relay. This end of the relay is located at ``./responder``. You will need to configure your Agent to use that path for communication. Next it creates a thread to read commands sent to the serial relay in the background. This allows responses to be defined within a test using DeviceEmulator.define_responses() after instantiation of the DeviceEmulator object within a given test. """ self._type = 'serial' self.proc = self._setup_socat(responder_link=responder_link) self.ser = serial.Serial( './internal', baudrate=57600, timeout=5, ) bkg_read = threading.Thread(name='background', target=self._read_serial) bkg_read.start()
[docs] def get_response(self, msg): """Determine the response to a given message. Args: msg (str): Command string to get the response for. Returns: str: Response string. Will return None if a valid response is not found. """ if self.responses is None: return if msg not in self.responses and self.default_response is not None: return self.default_response try: if isinstance(self.responses[msg], list): response = self.responses[msg].pop(0) else: response = self.responses[msg] except Exception as e: self.logger.info(f"Responses: {self.responses}") self.logger.info(f"encountered error {e}") self.logger.info(tb.format_exc()) response = None return response
def _read_serial(self): """Loop until shutdown, reading any commands sent over the relay. Respond immediately to a command with the response in self.responses. """ self._read = True while self._read: if self.ser.in_waiting > 0: try: msg = self.ser.readline() except SerialException as e: self.logger.error(f"Serial error: {e}") self.ser.close() return if self.encoding: msg = msg.strip().decode(self.encoding) self.logger.debug(f"msg='{msg}'") response = self.get_response(msg) # Avoid user providing bytes-like response if isinstance(response, bytes) and self.encoding is not None: response = response.decode() if response is None: continue self.logger.debug(f"response='{response}'") if self.encoding: response = (response + '\r\n').encode(self.encoding) self.ser.write(response) time.sleep(0.01) def __del__(self): self.shutdown()
[docs] def shutdown(self): """Shutdown communication on the configured relay. This will stop any attempt to read communication on the relay, as well as shutdown the relay itself. """ # print('shutting down background reading') self._read = False time.sleep(1) if self._type == 'serial': # print('shutting down socat relay') self.proc.terminate() out, err = self.proc.communicate() # print(out, err) if self._type == 'tcp': if self._conn: self._conn.close() self.socket_port = None else: # No connection has been made, force a connection to cleanly shutdown thread socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(('127.0.0.1', self.socket_port)) self._sock.close()
def _read_socket(self, port): """Loop until shutdown, reading any commands sent over the relay. Respond immediately to a command with the response in self.responses. Args: port (int): Port for the TCP relay to listen for connections on. """ self._read = True # Listen for connections self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while not self._sock_bound: try: self._sock.bind(('127.0.0.1', port)) self._sock_bound = True # Save port address, useful for if an arbitrary port 0 is used self.socket_port = self._sock.getsockname()[1] except OSError: self.logger.error(f"Failed to bind to port {port}, trying again...") time.sleep(1) self._sock.listen(1) self.logger.info("Device emulator waiting for tcp client connection") self._conn, client_address = self._sock.accept() self.logger.info(f"Client connection made from {client_address}") while self._read: try: msg = self._conn.recv(4096) if not msg: self.logger.info("Client disconnected") if self.reconnect: self.logger.info("Waiting for new connection") # attempt to reconnect self._conn, client_address = self._sock.accept() self.logger.info(f"Client connection made from {client_address}") continue else: self.logger.info("Shutting down") break # Was seeing this on tests in the cryomech agent except ConnectionResetError: self.logger.info('Caught connection reset on Agent clean up') break if self.encoding: msg = msg.strip().decode(self.encoding) if msg: self.logger.debug(f"msg='{msg}'") response = self.get_response(msg) # Avoid user providing bytes-like response if isinstance(response, bytes) and self.encoding is not None: response = response.decode() if response is None: continue self.logger.debug(f"response='{response}'") if self.encoding: response = response.encode(self.encoding) self._conn.sendall(response) time.sleep(0.01) self._conn.close() self._sock.close()
[docs] def create_tcp_relay(self, port): """Create the TCP relay, emulating a hardware device connected over TCP. Creates a thread to read commands sent to the TCP relay in the background. This allows responses to be defined within a test using DeviceEmulator.define_responses() after instantiation of the DeviceEmulator object within a given test. Args: port (int): Port for the TCP relay to listen for connections on. Notes: This will not return until the socket is properly bound to the given port. If this setup is not working it is likely another device emulator instance is not yet finished or has not been properly shutdown. """ self._type = 'tcp' self._sock_bound = False bkg_read = threading.Thread(name='background', target=self._read_socket, kwargs={'port': port}) bkg_read.start() # wait for socket to bind properly before returning while not self._sock_bound: time.sleep(0.1)
[docs] def update_responses(self, responses: Dict): """ Updates the current responses. See ``define_responses`` for more detail. Args ------ responses: dict Dict of commands to use to update the current responses. """ self.responses.update(responses) self.logger.info(f"responses set to {self.responses}")
[docs] def define_responses(self, responses, default_response=None): """Define what responses are available to reply with on the configured communication relay. Args: responses (dict): Dictionary of commands: response. Values can be a list, in which case the responses in the list are popped and given in order until depleted. default_response (str): Default response to send if a command is unrecognized. No response is sent and an error message is logged if a command is unrecognized and the default response is set to None. Defaults to None. Examples: The given responses might look like:: >>> responses = {'KRDG? 1': '+1.7E+03'} >>> responses = {'*IDN?': 'LSCI,MODEL425,4250022,1.0', 'RDGFIELD?': ['+1.0E-01', '+1.2E-01', '+1.4E-01']} Notes: The DeviceEmulator will handle encoding/decoding. The responses defined should all be strings, not bytes-like, unless you set ``encoding=None``. """ self.logger.info(f"responses set to {responses}") self.responses = deepcopy(responses) self.logger.info(f"default response set to '{default_response}'") self.default_response = default_response