import argparse
import os
import socket
import time
import numpy
import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
[docs]
class LDMonitor:
"""Receives and decodes data of the lightning detector via UDP
Parameters
----------
host : str
Address of the computer reading the data.
port : int
Port of host where data will be received, default 1110.
Attributes
----------
port : int
Port number in the local host to be bound to receive the data
log : txaio.tx.Logger
txaio logger object, created by the OCSAgent
sockopen : bool
Indicates when the socket is open
inittime : float
Logs the time at which initialization was carried out
data_dict : dictionary
Dictionary data stored from the lightning detector
newdata_dict : dictionary
Dictionary where new data is received
"""
def __init__(self, port=1110):
self.port = port
self.log = txaio.make_logger()
# check if socket has been opened
if hasattr(self, 'sockopen'):
if self.sockopen:
self.sock.close()
self.log.info('Socket closed preemptively')
# open and bing socket to receieve lightning detector data
try:
self.log.info('Opening socket')
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except BaseException:
self.log.info('Failed to create socket')
try:
self.log.info('Binding socket')
self.sock.bind(('', self.port))
self.sockopen = True
self.inittime = time.time()
except BaseException:
self.log.info('Failed to bind socket')
# initialize variables to account for absence of previous data
self.data_dict = {
'd_type': numpy.nan,
'field_value': numpy.nan,
'rot_fault': 0,
'time_last': -1.,
'tsince_last': -1.,
'dist': -1,
'unit_d': 0,
'high_field': -1,
'hifield_value': -1000.,
'alarm_r': 0,
'alarm_o': 0,
'alarm_y': 0,
'delay_g': 1,
'clear': 0,
'r_timer': 0,
'o_timer': 0,
'y_timer': 0,
'g_timer': 0,
'allclear_timer': 0,
'faultcode': 0
}
self.log.info('LDMonitor function initialized')
[docs]
def read_data(self):
"""
Receives data from the lightning detector via UDP,
and returns a dictionary containing formatted data
"""
self.data, _ = self.sock.recvfrom(1024)
self.data = self.data.decode('utf-8')
# "e-field" sentence
if self.data[0] == '$':
data_split = self.data[1:].split(',')
rot_fault = int(data_split[1].split('*')[0])
self.newdata_dict = {
'd_type': 0,
'field_value': float(data_split[0]),
'rot_fault': rot_fault
}
self.data_dict.update(self.newdata_dict)
return self.data_dict
elif self.data[0] == '@':
param = self.data[1:3]
# "lightning strike" sentence
if param == 'LI':
data_split = self.data.split(',')[1:]
if data_split[1].split('*')[0] == 'Miles':
unit_d = 0
elif data_split[1].split('*')[0] == 'Km':
unit_d = 1
self.newdata_dict = {
'd_type': 1,
'time_last': time.time(),
'dist': int(data_split[0]),
'unit_d': unit_d
}
self.data_dict.update(self.newdata_dict)
self.log.info('Lightning strike detected!')
return self.data_dict
# "high e-field" sentence, account for 2 types
elif param == 'HF':
data_split = self.data[1:].split(',')
if len(data_split) == 1:
self.newdata_dict = {
'd_type': 2,
'high_field': 1,
'hifield_value': float(self.data_dict['field_value'])
}
else:
self.newdata_dict = {
'd_type': 2,
'hifield_value': float(data_split[1])
}
self.data_dict.update(self.newdata_dict)
return self.data_dict
# "status" sentence
elif param == 'ST':
faultcode = int(self.data.split(',')[-1].split('*')[0], 16)
data_split = [int(i) for i in self.data.split(',')[1:-1]]
self.newdata_dict = {
'd_type': 3,
'alarm_r': data_split[0],
'alarm_o': data_split[1],
'alarm_y': data_split[2],
'delay_g': data_split[3],
'clear': data_split[4],
'r_timer': data_split[5],
'o_timer': data_split[6],
'y_timer': data_split[7],
'g_timer': data_split[8],
'allclear_timer': data_split[9],
'faultcode': faultcode
}
self.data_dict.update(self.newdata_dict)
return self.data_dict
# disregard "alarm timers" sentence, but update sentence type
elif param == 'WT':
self.newdata_dict = {'d_type': 4}
self.data_dict.update(self.newdata_dict)
return self.data_dict
[docs]
def read_cycle(self):
"""
In each cycle data is read and then parsed following
the format required to publish data to the ocs feed
"""
try:
self.read_data()
# updates time since last strike if strike data exists
if self.data_dict['time_last'] == -1.:
self.data_dict['tsince_last'] = -1.
else:
self.data_dict['tsince_last'] = (time.time()
- self.data_dict['time_last'])
return self.data_dict
except BaseException:
pass
self.log.info('LD data read error, passing to next data iteration')
[docs]
class LDMonitorAgent:
"""Monitor the Lightning Detector data via UDP.
Parameters
----------
agent : OCSAgent
OCSAgent object which forms this Agent
sample_interval : float
Time between samples in seconds.
Attributes
----------
agent : OCSAgent
OCSAgent object which forms this Agent
take_data : bool
Tracks whether or not the agent is actively issuing SNMP GET commands
to the ibootbar. Setting to false stops sending commands.
log : txaio.tx.Logger
txaio logger object, created by the OCSAgent
"""
def __init__(self, agent, sample_interval=15.):
self.agent: ocs_agent.OCSAgent = agent
self.log = agent.log
self.lock = TimeoutLock()
self.pacemaker_freq = 1. / sample_interval
self.initialized = False
self.take_data = False
self.LDMonitor = None
agg_params = {
'frame_length': 10 * 60 # [sec]
}
self.agent.register_feed('ld_monitor',
record=True,
agg_params=agg_params,
buffer_time=0)
def _connect(self):
"""connect()
Instantiates LD object and check if client is open
"""
self.LDMonitor = LDMonitor()
self.initialized = True
[docs]
@ocs_agent.param('auto_acquire', default=False, type=bool)
def init_ld_monitor(self, session, params=None):
"""init_ld_monitor(auto_acquire=False)
**Task** - Perform first time setup of the LD.
Parameters:
auto_acquire (bool, optional): Starts data acquisition after
initialization if True. Defaults to False.
"""
if self.initialized:
return True, "Already initialized."
with self.lock.acquire_timeout(3, job='init') as acquired:
if not acquired:
self.log.warn("Could not start init because "
"{} is already running".format(self.lock.job))
return False, "Could not acquire lock."
self._connect()
if not self.initialized:
return False, 'Could not connect to LD'
# Start data acquisition if requested
if params['auto_acquire']:
self.agent.start('acq')
return True, 'LD initialized.'
[docs]
@ocs_agent.param('_')
def acq(self, session, params=None):
"""acq()
**Process** - Starts the data acquisition process
Notes
_____
The most recent data collected is stored in session data in the
structure::
>>> response.session['data']
{'fields':{
'd_type': 3, 'field_value': 0.28, 'rot_fault': 0,
'time_last': -1.0, 'tsince_last': -1.0, 'dist': -1, 'unit_d': 0,
'high_field': -1, 'hifield_value': -1000.0, 'alarm_r': 0,
'alarm_o': 0, 'alarm_y': 0, 'delay_g': 1, 'clear': 1, 'r_timer': 0,
'o_timer': 0, 'y_timer': 0, 'g_timer': 0, 'allclear_timer': 0,
'faultcode': 0
},
...
'connection': {
'conn_timestamp': 1711285858.1063662,
'connected': True}, 'data_timestamp': 1711285864.6254003
}
}
"""
with self.lock.acquire_timeout(0, job='acq') as acquired:
if not acquired:
self.log.warn("Could not start acq because {} is already running"
.format(self.lock.job))
return False, "Could not acquire lock."
self.take_data = True
session.data = {"fields": {}}
pm = Pacemaker(self.pacemaker_freq)
while self.take_data:
pm.sleep()
current_time = time.time()
data = {
'timestamp': current_time,
'connection': {},
'data': {}
}
if not self.LDMonitor.sockopen:
self.initialized = False
# Try to re-initialize if connection lost
if not self.initialized:
self._connect()
# Only get readings if connected
if self.initialized:
session.data.update({'connection': {'conn_timestamp': self.LDMonitor.inittime,
'connected': True}})
ld_data = self.LDMonitor.read_cycle()
if ld_data:
for key, value in ld_data.items():
data['data'][key] = value
session.data.update({'data_timestamp': current_time,
'fields': ld_data})
self.log.debug(ld_data)
else:
self.log.info('Connection error or error in processing data.')
self.initialized = False
# Continue trying to connect
if not self.initialized:
session.data.update({'connection': {'last_attempt': time.time(),
'connected': False}})
self.log.info('Trying to reconnect.')
continue
for field, val in data['data'].items():
_data = {
'timestamp': current_time,
'block_name': field,
'data': {field: val}
}
self.agent.publish_to_feed('ld_monitor', _data)
self.agent.feeds['ld_monitor'].flush_buffer()
return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None):
"""
Stops acq process.
"""
if self.take_data:
self.take_data = False
return True, 'requested to stop taking data.'
else:
return False, 'acq is not currently running'
def make_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'],
help="Starting action for the agent.")
pgroup.add_argument("--sample-interval", type=float, default=.2, help="Time between samples in seconds.")
return parser
def main(args=None):
# Start logging
txaio.start_logging(level=os.environ.get("LOGLEVEL", "info"))
parser = make_parser()
# Interpret options in the context of site_config.
args = site_config.parse_args(agent_class='LDMonitor',
parser=parser,
args=args)
# Automatically acquire data if requested (default)
init_params = False
if args.mode == 'init':
init_params = {'auto_acquire': False}
elif args.mode == 'acq':
init_params = {'auto_acquire': True}
print('init_params', init_params)
agent, runner = ocs_agent.init_site_agent(args)
p = LDMonitorAgent(agent, sample_interval=args.sample_interval)
agent.register_task('init_ld_monitor', p.init_ld_monitor,
startup=init_params)
agent.register_process('acq', p.acq, p._stop_acq)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()