#!/usr/bin/env python
import argparse
import time
import requests
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock
[docs]
class SynaccessAgent:
"""
Agent to control and monitor Synaccess Networks PDUs.
Args:
ip_address(str): IP address for the device.
username(str): Username credential to login to device.
password(str): Password credential to login to device.
outlet_names(list of str): List of outlet names.
num_outlets(int): Number of outlets for device.
"""
def __init__(self, agent, ip_address, username, password, outlet_names=None, num_outlets=5):
self.agent = agent
self.log = agent.log
self.lock = TimeoutLock()
self.ip_address = ip_address
self.user = username
self.passw = password
if outlet_names is None:
outlet_names = []
for i in range(num_outlets):
outlet_names.append('outlet%i' % (i + 1))
self.outlet_names = outlet_names
else:
for i in range(num_outlets):
if len(outlet_names) <= i:
outlet_names.append('outlet%i' % (i + 1))
self.outlet_names = outlet_names
agg_params = {'frame_length': 60}
self.agent.register_feed(
'synaccess', record=True, agg_params=agg_params)
def __get_status(self):
req = "http://" + self.user + ":" + self.passw + "@" +\
self.ip_address + "/cmd.cgi?$A5"
r = requests.get(req)
resp = r.content.decode()[:-6:-1] # get last 5 elements, in reverse
return resp
[docs]
@ocs_agent.param('_')
def get_status(self, session, params=None):
"""get_status()
**Task** - Get the status of all outlets.
"""
with self.lock.acquire_timeout(3, job='get_status') as acquired:
if acquired:
resp = self.__get_status()
ret_str = []
for x in resp:
if x == '1':
ret_str.append('on')
else:
ret_str.append('off')
return True, 'status outlet [1,2,3,4,5] is {}'.format(ret_str)
else:
return False, "Could not acquire lock"
[docs]
@ocs_agent.param('outlet', type=int)
def reboot(self, session, params=None):
"""reboot(outlet)
**Task** - Reboot a given outlet.
Parameters:
outlet (int): The outlet that we are changing the state of.
"""
with self.lock.acquire_timeout(3, job='reboot') as acquired:
if acquired:
req = "http://" + self.user + ":" + \
self.passw + "@" + self.ip_address + \
"/cmd.cgi?$A4" + " " + str(params['outlet'])
requests.get(req)
return True, 'Rebooted outlet {}'.format(params['outlet'])
else:
return False, "Could not acquire lock"
[docs]
@ocs_agent.param('outlet', type=int)
@ocs_agent.param('on', type=bool)
def set_outlet(self, session, params=None):
"""set_outlet(outlet, on)
**Task** - Set a particular outlet on/off.
Parameters:
outlet (int): The outlet that we are changing the state of.
on (bool): The new state. True for on, False for off.
"""
with self.lock.acquire_timeout(3, job='set_outlet') as acquired:
if acquired:
if params['on']:
on = "1"
else:
on = "0"
req = "http://" + self.user + ":" + self.passw + "@" + \
self.ip_address + "/cmd.cgi?$A3" + " " + \
str(params['outlet']) + " " + on
requests.get(req)
return True, 'Set outlet {} to {}'.\
format(params['outlet'], params['on'])
else:
return False, "Could not acquire lock"
[docs]
@ocs_agent.param('on', type=bool)
def set_all(self, session, params=None):
"""set_all(on)
**Task** - Set all outlets on/off.
Parameters:
on (bool): The new state. True for on, False for off.
"""
with self.lock.acquire_timeout(3, job='set_all') as acquired:
if acquired:
on = "0"
if params['on']:
on = "1"
req = "http://" + self.user + ":" + self.passw + "@" +\
self.ip_address + "/cmd.cgi?$A7" + " " + on
requests.get(req)
return True, 'Set all outlets to {}'.format(params['on'])
else:
return False, "Could not acquire lock"
[docs]
@ocs_agent.param('_')
def acq(self, session, params=None):
"""acq()
**Process** - Start data acquisition.
Notes:
The most recent data collected is stored in session.data in the
structure::
>>> response.session['data']
{"fields":
{"0": {"status": 0 or 1, (0: OFF, 1:ON)
"name": "outlet1"},
"1": {"status": 0 or 1, (0: OFF, 1:ON)
"name": "outlet2"},
"2": {"status": 0 or 1, (0: OFF, 1:ON)
"name": "outlet3"},
"3": {"status": 0 or 1, (0: OFF, 1:ON)
"name": "outlet4"},
"4": {"status": 0 or 1, (0: OFF, 1:ON)
"name": "outlet5"},
}
}
"""
with self.lock.acquire_timeout(timeout=3, job='acq')\
as acquired:
if not acquired:
self.log.warn(
'Could not start status acq because {} is already running'
.format(self.lock.job))
return False, 'Could not acquire lock'
self.take_data = True
last_release = time.time()
session.data = {'fields': {}}
while self.take_data:
if time.time() - last_release > 1.:
last_release = time.time()
if not self.lock.release_and_acquire(timeout=10):
self.log.warn(
'Could not re-acquire lock now held by {}.'
.format(self.lock.job))
return False, 'Could not re-acquire lock (timeout)'
current_time = time.time()
data = {'timestamp': current_time,
'block_name': 'synaccess_status',
'data': {}}
status_dict = {}
resp = self.__get_status()
for i, x in enumerate(resp):
if x == '1':
status = 1
else:
status = 0
data['data']['synaccess_%d' % i] = status
status_dict['%d' % i] = {'status': status}
status_dict['%d' % i]['name'] = self.outlet_names[i]
self.agent.publish_to_feed('synaccess', data)
session.data['timestamp'] = current_time
session.data['fields'] = status_dict
time.sleep(1) # DAQ interval
# End of while loop
self.agent.feeds['synaccess'].flush_buffer()
return True, 'Acqusition exited cleanly'
def stop_acq(self, session, params=None):
if self.take_data:
self.take_data = False
return True, 'requested to stop taking data'
return False, 'acq is not currently running'
def make_parser(parser=None):
"""Build the argument parser for the Agent. Allows sphinx to automatically
build documentation based on this function.
"""
if parser is None:
parser = argparse.ArgumentParser()
# Add options specific to this agent.
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--ip-address', help='IP address for the device.')
pgroup.add_argument('--username', help='Username credential to login to device.')
pgroup.add_argument('--password', help='Password credential to login to device.')
pgroup.add_argument('--outlet-names', nargs='+', type=str,
help="List of outlet names.")
pgroup.add_argument('--num-outlets', default=5, help='Number of outlets for the device.')
return parser
def main(args=None):
parser = make_parser()
args = site_config.parse_args(agent_class='SynaccessAgent',
parser=parser,
args=args)
agent, runner = ocs_agent.init_site_agent(args)
p = SynaccessAgent(agent,
ip_address=args.ip_address,
username=args.username,
password=args.password,
outlet_names=args.outlet_names,
num_outlets=args.num_outlets)
agent.register_process('acq', p.acq,
p.stop_acq, startup=True)
agent.register_task('get_status', p.get_status, startup={})
agent.register_task('reboot', p.reboot)
agent.register_task('set_outlet', p.set_outlet)
agent.register_task('set_all', p.set_all)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()