#!/usr/bin/env python3
"""dS378 ethernet relay"""
import sys
from enum import IntEnum
from socs.tcp import TCPInterface
# Command byte, length of expected response
GET_STATUS = 0x30, 8
SET_RELAY = 0x31, 1
SET_OUTPUT = 0x32, 1
GET_RELAYS = 0x33, 5
GET_INPUTS = 0x34, 2
GET_ANALOG = 0x35, 14
GET_COUNTERS = 0x36, 8
# Number of trial for send / receive
N_TRIAL = 2
LEN_BUFF = 1024
[docs]
class RelayStatus(IntEnum):
"""Relay status"""
on = 1
off = 0
[docs]
class DS378(TCPInterface):
"""dS378 ethernet relay"""
def __init__(self, ip, port, timeout=10):
super().__init__(ip, port, timeout)
def __del__(self):
self._com.close()
def _send(self, msg):
self.send(msg)
def _send1(self, msg_byte):
self._send(bytearray([msg_byte]))
def _recv(self):
rcv = self.recv(LEN_BUFF)
return rcv
def _recv1(self):
return self._recv()[0]
def _send_recv(self, msg, length):
# Check the length of the received message
# to drop invalid response when reconnecting.
for _ in range(N_TRIAL):
self._send(msg)
msg_rcv = self._recv()
if len(msg_rcv) == length:
return msg_rcv
raise ConnectionError
[docs]
def get_status(self):
"""Get status of the dS378 device.
Returns
-------
d_status : dict
Status information
"""
ret_bytes = self._send_recv(bytearray([GET_STATUS[0]]),
GET_STATUS[1])
d_status = {}
d_status['module_id'] = ret_bytes[0]
d_status['firm_ver'] = f'{ret_bytes[1]}.{ret_bytes[2]}'
d_status['app_ver'] = f'{ret_bytes[3]}.{ret_bytes[4]}'
d_status['V_sppl'] = ret_bytes[5] / 10
d_status['T_int'] = int.from_bytes(ret_bytes[6:8], signed=True, byteorder='big') / 10
return d_status
[docs]
def set_relay(self, relay_number, on_off, pulse_time=0):
"""Turns the relay on/off or pulses it
Parameters
----------
relay_number : int
relay_number, 1 -- 8
on_off : int or RelayStatus
1: on, 0: off
pulse_time : int, 32 bit
See document
Returns
-------
status : int
0: ACK
otherwise: NACK
"""
assert 1 <= relay_number <= 8
assert 0 <= pulse_time <= 2**32 - 1
assert on_off in [0, 1]
msg = bytearray([SET_RELAY[0], relay_number, on_off])
msg += pulse_time.to_bytes(4, byteorder='big')
self._send(msg)
return self._recv1()
[docs]
def set_output(self, io_num, on_off):
"""Set output on/off
Parameters
----------
io_num : int, 1 -- 7
I/O port number
on_off : int or RelayStatus
1: on, 0: off
"""
assert 1 <= io_num <= 7
assert on_off in [0, 1]
msg = bytearray([SET_OUTPUT[0], io_num, on_off])
self._send(msg)
return self._recv1()
[docs]
def get_relays(self):
"""Get relay states
Returns
-------
d_status : list of RelayStatus
"""
ret_bytes = self._send_recv(bytearray([GET_RELAYS[0], 1]),
GET_RELAYS[1])
d_status = [None] * 32
for i in range(32):
d_status[i] = RelayStatus((ret_bytes[4 - int(i / 8)] >> (i % 8)) & 1)
return d_status
[docs]
def get_analog(self):
"""Get analog input
Returns
-------
values : list of int
"""
ret_bytes = self._send_recv(bytearray([GET_ANALOG[0]]),
GET_ANALOG[1])
values = [None] * 7
for i in range(7):
values[i] = int.from_bytes(ret_bytes[2 * i:2 * i + 2], byteorder='big')
return values
[docs]
def get_counters(self, counter_number):
"""Get counters
Parameter
---------
counter_number : int
counter number. 1 -- 8
Returns
-------
c_current : int
current counter value
c_reg : register
capture register for the counter
"""
ret_bytes = self._send_recv(bytearray([GET_COUNTERS[0], counter_number]),
GET_COUNTERS[1])
c_current = int.from_bytes(ret_bytes[0:4], byteorder='big')
c_reg = int.from_bytes(ret_bytes[4:8], byteorder='big')
return c_current, c_reg
def main():
"""Main function"""
ds_dev = DS378(sys.argv[1], 17123)
print(ds_dev.get_status())
print(ds_dev.get_relays()[:8])
ds_dev.set_relay(3, RelayStatus.off)
print(ds_dev.get_relays()[:8])
print(ds_dev.get_analog())
print(ds_dev.get_inputs())
if __name__ == '__main__':
main()