#!/usr/bin/env python3
"""Module to read stimulator encoder.
"""
import fcntl
import mmap
import os
import sys
from pathlib import Path
from queue import Queue
from threading import Thread
from time import sleep, time
import numpy as np
ADDR_AXI = 0x80020000
PATH_DEV_BASE = Path(f'/sys/devices/platform/axi/{ADDR_AXI:08x}.str_rd/uio')
PATH_LOCK = Path('/tmp/').joinpath('.stim-lock')
[docs]
class StimEncError(Exception):
"""
Exception rased by stimulator encoder reader.
"""
[docs]
class StimEncTime:
"""
Stimulator encoder time.
Parameter
---------
time_raw : int
Raw time format from TSU.
[sec 48 bits][nsec 30 bits][sub-nsec 16 bits]
"""
def __init__(self, time_raw: int):
self._time_raw = time_raw
@property
def sec(self) -> int:
"""
Seconds part of timestamp.
Returns
-------
sec : int
Seconds part of timestamp.
"""
return self._time_raw >> 46
@property
def nsec(self) -> int:
"""
Nano second part of timestamp.
Returns
-------
nsec : int
Nano-sec part of timestamp.
"""
return (0x_00000000_00003fff_ffff0000 & self._time_raw) >> 16
@property
def tai(self) -> float:
"""
Time in seconds from TAI epoch.
Returns
-------
tai : float
Seconds from TAI epoch.
"""
return self.sec + (self.nsec / 1e9)
[docs]
class StimEncData:
"""
Stimulator encoder data.
Parameter
---------
data_raw : ndarray
Raw data from PL FIFO.
"""
def __init__(self, data_bytes):
self._data_bytes = data_bytes
self._data_int = int(data_bytes[0]) + (int(data_bytes[1]) << 32) + (int(data_bytes[2]) << 64)
self._utime = time()
@property
def state(self) -> int:
return (self._data_int & 0xC0_00_00_00_00000000_00000000) >> 94
@property
def time_raw(self) -> int:
"""94 bit TSU timestamp.
Returns
-------
time_raw : int
94 bit TSU timestamp.
"""
return self._data_int & 0x3F_FF_FF_FF_FFFFFFFF_FFFFFFFF
@property
def time(self) -> StimEncTime:
"""
TSU timestamp.
Returns
-------
time : StimTime
TSU timestamp abstraction.
"""
return StimEncTime(self.time_raw)
@property
def utime(self) -> float:
"""
Unix timestamp at class creation.
Returns
-------
utime : Unix timestamp when this object is created.
"""
return self._utime
def __str__(self):
return f'time={int(self.time.g3) / 1e8:.8f} data={self.state:02b}'
def get_path_dev() -> Path:
"""
Acquire devicefile path for `str_rd` IP core.
Returns
-------
path_dev : Path
Path to the device file.
"""
if not PATH_DEV_BASE.exists():
raise StimEncError('Device is not found. Check firmware and device tree.')
name_dev = list(PATH_DEV_BASE.glob('uio*'))[0].name
path_dev = Path(f'/dev/{name_dev}')
return path_dev
[docs]
class StimEncReader:
"""Class to read encoder data.
Parameters
----------
path_dev : str or pathlib.Path
Path to the generic-uio device file for str_rd IP.
path_lock : str or pathlib.Path
Path to the lockfile.
"""
def __init__(self, path_dev, path_lock=PATH_LOCK, verbose=True):
# Verbose level
self._verbose = verbose
# Locking
self._fp_lock = open(path_lock, 'w')
try:
fcntl.flock(self._fp_lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
raise StimEncError('locked.')
# Connection
self._path_dev = path_dev
self._dfile = os.open(self._path_dev, os.O_RDONLY | os.O_SYNC)
self._dev = mmap.mmap(self._dfile, 0x100, mmap.MAP_SHARED, mmap.PROT_READ, offset=0)
# Data FIFO
self.fifo: "Queue[StimEncData]" = Queue()
# Runner
self._thread = None
self._running = False
def __del__(self):
if self._running:
self.stop()
fcntl.flock(self._fp_lock, fcntl.LOCK_UN)
self._dev.close()
os.close(self._dfile)
self._eprint('Fin.')
def _eprint(self, errmsg):
if self._verbose:
sys.stderr.write(f'{errmsg}\r\n')
def _get_info(self):
data = np.frombuffer(self._dev, np.uint32, 4, offset=0)
r_len = data[0]
w_len = data[1]
residue = data[2]
return r_len, w_len, residue
def _get_data(self) -> StimEncData:
data = np.frombuffer(self._dev, np.uint32, 4, offset=16)
return StimEncData(data)
[docs]
def fill(self):
"""
Get data from PL fifo and put into software fifo.
"""
while True:
r_len, w_len, residue = self._get_info()
if (r_len == 0) and (residue == 0):
break
self.fifo.put(self._get_data())
def _loop(self):
while self._running:
self.fill()
sleep(0.1)
[docs]
def run(self):
"""
Run infinite loop of data filling.
"""
self._running = True
self._thread = Thread(target=self._loop)
self._thread.start()
def stop(self):
if not self._running:
raise StimEncError('Not started yet.')
self._running = False
self._thread.join()
def main():
"""Main function to boot infinite loop"""
stim_enc = StimEncReader(get_path_dev(), verbose=True)
# Filler loop
fd = open('test.dat', 'w')
stim_enc.run()
while True:
try:
while not stim_enc.fifo.empty():
data = stim_enc.fifo.get()
print(data)
fd.write(str(data) + '\n')
sleep(0.1)
except KeyboardInterrupt:
stim_enc.stop()
fd.close()
break
print('Fin.')
if __name__ == '__main__':
main()