Source code for socs.tcp

import selectors
import socket


[docs] class TCPInterface: """Interface class for connecting to devices using TCP. Parameters ---------- ip_address : str IP address of the device. port : int Associated port for TCP communication. timeout : float Duration in seconds that operations wait before giving up. Attributes ---------- ip_address : str IP address of the device. port : int Associated port for TCP communication. timeout : float Duration in seconds that operations wait before giving up. comm : socket.socket Socket object that forms the connection to the device. """ def __init__(self, ip_address, port, timeout): self.ip_address = ip_address self.port = int(port) self.timeout = timeout self.comm = self._connect((self.ip_address, self.port)) def _connect(self, address): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) try: sock.connect(address) except TimeoutError: print(f"Connection not established within {self.timeout} seconds.") return except OSError as e: print(f"Unable to connect. {e}") return except Exception as e: print(f"Caught unexpected {type(e).__name__} while connecting:") print(f" {e}") return return sock def _reset(self): print("Resetting the connection to the device.") if self.comm: self.comm.close() self.comm = None self.comm = self._connect((self.ip_address, self.port))
[docs] def send(self, msg): """Send message to socket. This method will try to send the message and if it runs into any issues will try to re-establish the socket connection before trying to send the message again. If it fails a second time it raises an exception. If the connection has failed to reset from a previous ``send``, or has not yet been established, it will first try to connnect before sending the message. If it fails to establish the connection it will raise an exception. Parameters ---------- msg : str Message string to send on socket. Raises ------ ConnectionError Raised if the communication fails for any reason. """ if self.comm is None: print("Connection not established.") self._reset() if self.comm is None: raise ConnectionError("Unable to establish connection.") try: self.comm.sendall(msg) return except (BrokenPipeError, ConnectionResetError) as e: print(f"Connection error: {e}") self._reset() except TimeoutError as e: print(f"Timeout error while writing: {e}") self._reset() except Exception as e: print(f"Caught unexpected {type(e).__name__} during send:") print(f" {e}") self._reset() # Try a second time before giving up try: self.comm.sendall(msg) except (BrokenPipeError, ConnectionResetError) as e: print(f"Connection error: {e}") raise ConnectionError except TimeoutError as e: print(f"Timeout error while writing: {e}") raise ConnectionError except AttributeError: raise ConnectionError("Unable to reset connection.") except Exception as e: print(f"Caught unexpected {type(e).__name__} during send:") print(f" {e}") raise ConnectionError
def _check_ready(self): """Check socket is ready to read from.""" if self.comm is None: raise ConnectionError("Connection not established, not ready to read.") sel = selectors.DefaultSelector() sel.register(self.comm, selectors.EVENT_READ) if not sel.select(self.timeout): raise ConnectionError("Socket not ready to read. Possible timeout.")
[docs] def recv(self, bufsize=4096, reset_on_error=False): """Receive response from the device. This method will check if the socket is ready to be read from before performing the recv. If there is no data to read, or the socket is otherwise unready an exception is raised. Parameters ---------- bufsize : int Amount of data to be recieved in bytes. Defaults to 4096. reset_on_error : bool Flag to set whether to recreate the socket connection on error. This can be useful to effectively clear the network buffer, but some devices don't handle multiple rapid connections well. Defaults to False. Returns ------- ``str`` or ``bytes`` The response from the device. The return type depends on the device. Raises ------ ConnectionError Raised if the socket is not ready to read from. """ if reset_on_error: try: self._check_ready() except ConnectionError as e: print(e) self._reset() raise ConnectionError else: self._check_ready() try: data = self.comm.recv(bufsize) except OSError as e: print(f"Connection error: {e}") if reset_on_error: self._reset() raise ConnectionError except Exception as e: print(f"Caught unexpected {type(e).__name__} during recv:") print(f" {e}") if reset_on_error: self._reset() raise ConnectionError return data
def __del__(self): if self.comm: self.comm.close()