#!/usr/bin/env python3
'''OCS agent for dS378 ethernet relay
'''
import argparse
import os
import time
import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from socs.agents.devantech_dS378.drivers import DS378
PORT_DEFAULT = 17123
LOCK_RELEASE_SEC = 1.
LOCK_RELEASE_TIMEOUT = 10
ACQ_TIMEOUT = 100
[docs]
class DS378Agent:
"""OCS agent class for dS378 ethernet relay
Parameters
----------
ip : string
IP address
port : int
Port number
"""
def __init__(self, agent, ip, port=PORT_DEFAULT):
self.active = True
self.agent = agent
self.log = agent.log
self.lock = TimeoutLock()
self.take_data = False
self._dev = DS378(ip=ip, port=port)
self.initialized = False
agg_params = {'frame_length': 60}
self.agent.register_feed('relay',
record=True,
agg_params=agg_params,
buffer_time=1)
[docs]
@ocs_agent.param('sampling_frequency', default=0.5, type=float)
def acq(self, session, params):
"""acq()
**Process** - Monitor status of the relay.
Parameters
----------
sampling_frequency : float, optional
Sampling frequency in Hz, defaults to 0.5 Hz.
Notes
-----
An example of the session data::
>>> response.session['data']
{'V_sppl': 11.8,
'T_int': 30.8,
'Relay_1': 0,
'Relay_2': ...,
'timestamp': 1736541796.779634
}
"""
f_sample = params.get('sampling_frequency', 0.5)
pace_maker = Pacemaker(f_sample)
with self.lock.acquire_timeout(timeout=0, job='acq') as acquired:
if not acquired:
self.log.warn(
f'Could not start acq because {self.lock.job} is already running')
return False, 'Could not acquire lock.'
self.take_data = True
session.data = {}
last_release = time.time()
while self.take_data:
# Release lock
if time.time() - last_release > LOCK_RELEASE_SEC:
last_release = time.time()
if not self.lock.release_and_acquire(timeout=LOCK_RELEASE_TIMEOUT):
print(f'Re-acquire failed: {self.lock.job}')
return False, 'Could not re-acquire lock.'
# Data acquisition
current_time = time.time()
data = {'timestamp': current_time, 'block_name': 'relay', 'data': {}}
try:
d_status = self._dev.get_status()
relay_list = self._dev.get_relays()
if session.degraded:
self.log.info('Connection re-established.')
session.degraded = False
except ConnectionError:
self.log.error('Failed to get data from relay. Check network connection.')
session.degraded = True
time.sleep(1)
continue
data['data']['V_sppl'] = d_status['V_sppl']
data['data']['T_int'] = d_status['T_int']
for i in range(8):
data['data'][f'Relay_{i + 1}'] = relay_list[i]
field_dict = {'V_sppl': d_status['V_sppl'],
'T_int': d_status['T_int']}
for i in range(8):
field_dict[f'Relay_{i + 1}'] = relay_list[i]
session.data.update(field_dict)
self.agent.publish_to_feed('relay', data)
session.data.update({'timestamp': current_time})
pace_maker.sleep()
self.agent.feeds['relay'].flush_buffer()
return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None):
if self.take_data:
self.take_data = False
return True, 'requested to stop taking data.'
return False, 'acq is not currently running.'
[docs]
@ocs_agent.param('relay_number', type=int, check=lambda x: 1 <= x <= 8)
@ocs_agent.param('on_off', type=int, choices=[0, 1])
@ocs_agent.param('pulse_time', default=None, type=int, check=lambda x: 0 <= x <= 2**32 - 1)
def set_relay(self, session, params=None):
"""set_relay(relay_number, on_off, pulse_time=None)
**Task** - Turns the relay on/off or pulses it.
Parameters
----------
relay_number : int
Relay number to manipulate. Values must be in range [1, 8].
on_off : int
1 (0) to turn on (off).
pulse_time : int, optional
Pulse time in ms. Values must be in range [0, 4294967295].
Notes
-----
This command pulses relay for a given period when ``pulse_time``
argument is specified, otherwise just turns a relay on or off.
"""
with self.lock.acquire_timeout(3, job='set_values') as acquired:
if not acquired:
self.log.warn('Could not start set_values because '
f'{self.lock.job} is already running')
return False, 'Could not acquire lock.'
if params.get('pulse_time') is None:
params['pulse_time'] = 0
self._dev.set_relay(relay_number=params['relay_number'],
on_off=params['on_off'],
pulse_time=params['pulse_time'])
return True, 'Set values'
[docs]
def get_relays(self, session, params=None):
"""get_relays()
**Task** - Returns current relay status.
Notes
-----
The most recent data collected is stored in session.data in the
structure::
>>> response.session['data']
{'Relay_1': 1,
'Relay_2': ...,
'timestamp': 1736541796.779634
}
"""
with self.lock.acquire_timeout(3, job='get_relays') as acquired:
if not acquired:
self.log.warn('Could not start get_relays because '
f'{self.lock.job} is already running')
return False, 'Could not acquire lock.'
d_status = self._dev.get_relays()
session.data = {f'Relay_{i + 1}': d_status[i] for i in range(8)}
session.data.update({'timestamp': time.time()})
return True, 'Got relay status'
def make_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--port', default=PORT_DEFAULT, type=int,
help='Port number for TCP communication.')
pgroup.add_argument('--ip_address',
help='IP address of the device.')
return parser
def main(args=None):
"""Boot OCS agent"""
txaio.start_logging(level=os.environ.get('LOGLEVEL', 'info'))
parser = make_parser()
args = site_config.parse_args(agent_class='DS378Agent',
parser=parser,
args=args)
agent_inst, runner = ocs_agent.init_site_agent(args)
ds_agent = DS378Agent(agent_inst, ip=args.ip_address, port=args.port)
agent_inst.register_task(
'set_relay',
ds_agent.set_relay
)
agent_inst.register_task(
'get_relays',
ds_agent.get_relays
)
agent_inst.register_process(
'acq',
ds_agent.acq,
ds_agent._stop_acq,
startup=True
)
runner.run(agent_inst, auto_reconnect=True)
if __name__ == '__main__':
main()