#!/usr/bin/env python3
"""OCS agent for stimulator encoder
"""
import argparse
import os
import time
import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from socs.agents.stimulator_encoder.drivers import (PATH_LOCK, StimEncReader,
get_path_dev)
[docs]
class StimEncAgent:
"""OCS agent class for stimulator encoder
Parameters
----------
path_dev : str or pathlib.Path
Path to the generic-uio device file for str_rd IP.
path_lock : str or pathlib.Path
Path to the lockfile.
"""
def __init__(self, agent, path_dev=None, path_lock=PATH_LOCK):
self.active = True
self.agent = agent
self.log = agent.log
self.lock = TimeoutLock()
self.take_data = False
if path_dev is None:
self._path_dev = get_path_dev()
self._dev = StimEncReader(self._path_dev, path_lock, verbose=False)
self.initialized = False
agg_params = {'frame_length': 60, 'exclude_influx': True}
self.agent.register_feed('stim_enc',
record=True,
agg_params=agg_params,
buffer_time=1)
agg_params_downsampled = {'frame_length': 60}
self.agent.register_feed('stim_enc_downsampled',
record=True,
agg_params=agg_params_downsampled,
buffer_time=1)
[docs]
def acq(self, session, params):
"""acq()
**Process** - Start acquiring data.
Notes
-----
An example of the session data::
>>> response.session['data']
{'timestamp_tai': 1736541833.679634,
'state': 1,
'freq': 10.1,
'timestamp': 1736541796.779634
}
"""
pace_maker = Pacemaker(0.5)
with self.lock.acquire_timeout(timeout=0, job='acq') as acquired:
if not acquired:
self.log.warn(
f'Could not start acq because {self.lock.job} is already running')
return False, 'Could not acquire lock.'
self.take_data = True
session.data = {}
self._dev.run()
tai_latest = 0.
en_st_latest = 0
pulse_count = 0
time_prev = time.time()
while self.take_data:
# Data acquisition
time_now = time.time()
data = {'block_name': 'stim_enc', 'data': {}}
tai_list = []
ts_list = []
en_st_list = []
while not self._dev.fifo.empty():
_d = self._dev.fifo.get()
tai_list.append(_d.time.tai)
ts_list.append(_d.utime)
en_st_list.append(_d.state)
if len(tai_list) != 0:
data['timestamps'] = ts_list
data['data']['timestamps_tai'] = tai_list
data['data']['state'] = en_st_list
self.agent.publish_to_feed('stim_enc', data)
tai_latest = tai_list[-1]
en_st_latest = en_st_list[-1]
pulse_count += len(en_st_list)
# Slow feed for InfluxDB and session.data
freq = pulse_count / (time_now - time_prev)
field_dict = {'timestamp_tai': tai_latest,
'state': en_st_latest,
'freq': freq,
'timestamp': time_now}
session.data.update(field_dict)
data_downsampled = {'timestamp': time_now,
'block_name': 'stim_enc_downsampled',
'data': {
'timestamps_tai': tai_latest,
'state': en_st_latest,
'freq': freq
}}
self.agent.publish_to_feed('stim_enc_downsampled',
data_downsampled)
time_prev = time_now
pulse_count = 0
pace_maker.sleep()
self.agent.feeds['stim_enc'].flush_buffer()
return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None):
"""
Stops the data acquisiton.
"""
if self.take_data:
self.take_data = False
self._dev.stop()
return True, 'requested to stop taking data.'
return False, 'acq is not currently running.'
def make_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--path-dev', default=None, type=str,
help='Path to the device file.')
pgroup.add_argument('--path-lock', default=PATH_LOCK, type=str,
help='Path to the lock file.')
return parser
def main(args=None):
"""Boot OCS agent"""
txaio.start_logging(level=os.environ.get('LOGLEVEL', 'info'))
parser = make_parser()
args = site_config.parse_args('StimEncAgent',
parser=parser,
args=args)
agent_inst, runner = ocs_agent.init_site_agent(args)
stim_enc_agent = StimEncAgent(agent_inst)
agent_inst.register_process(
'acq',
stim_enc_agent.acq,
stim_enc_agent._stop_acq,
startup=True
)
runner.run(agent_inst, auto_reconnect=True)
if __name__ == '__main__':
main()