Source code for socs.agents.generator.agent

import argparse
import os
import sys
import time

import txaio
import yaml
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from pyModbusTCP.client import ModbusClient

byteorder = sys.byteorder


def load_configs(dir_name, config_extension='yaml'):
    '''Loads all register configuration files form the specified directory (path).
    The configurations are returned as a list of the configurations from individual files.'''

    config_dir = os.environ.get('OCS_CONFIG_DIR')
    path = os.path.join(config_dir, 'generator_config', dir_name)

    all_configs = []

    ls = os.listdir(path)

    # Make a filter so that we only try to load files ending in config_extension
    def filt(f_name):
        return f_name.endswith(config_extension)
    # Filter
    configs = filter(filt, ls)
    # Load configurations from the remaining files
    for f in configs:
        read_config = os.path.join(path, f)
        with open(read_config) as f:
            try:
                data = yaml.load(f, Loader=yaml.SafeLoader)
            except BaseException:
                pass
            all_configs.append(data)
    return all_configs


def twos(val, bytes):
    '''Take an unsigned integer representation of a two's compilment integer and return the correctly signed integer'''
    b = val.to_bytes(bytes, byteorder=byteorder, signed=False)
    return int.from_bytes(b, byteorder=byteorder, signed=True)


def interp_unsigned_double_reg(r1, r2):
    '''Take two 16 bit register values and combine them assuming we really wanted to read a 32 bit unsigned register'''
    return (r1 << 16) + r2


def interp_signed_double_reg(r1, r2):
    '''Take two 16 bit register values and combine them assuming we really wanted to read a 32 bit signed register'''
    return twos(interp_unsigned_double_reg(r1, r2), 4)


def make_bin_reader(offset, spec):
    '''Read an individual bit or continuous range of bits out of the 2 byte register.
    A single bit can be specified as a single number between 1 and 16, and a range can
    be specified with a dash, i.e. X-Y where both X and Y are in the range
    1 to 16 and X < Y. In either case the bit or range of bits will be returned as an
    integer.'''
    spec = spec.split(' ')[1:]
    spec = spec[0].split('-')
    spec = [int(s) for s in spec]
    if len(spec) == 1:
        # Process individual bit
        spec = spec[0]
        # The mask leaves only the desired bit
        mask = sum([1 << s for s in range(spec - 1, spec)])

        def reader(val):
            return (val[offset] & mask) >> spec - 1
        return reader
    elif len(spec) == 2:
        # Process range
        low = spec[0]
        high = spec[1]
        if low >= high:
            raise ValueError('First bit in range specification must be smaller than last.')
        # The mask leaves only the desired bits
        mask = sum([1 << s for s in range(low - 1, high)])

        def reader(val):
            return (val[offset] & mask) >> low - 1
        return reader
    else:
        raise ValueError('Cannot read binary read_as specification; use single bit or continuous range.')


class ReadBlock(object):
    '''An object for reading, converting, and evaluating information from a single contiouous block of registers'''

    def __init__(self, config, error_out_of_range=True, filter_errors=True):
        self.name = config['block_name']

        try:
            self.read_start = config['read_start']
        except KeyError:
            self.read_start = config['page'] * 256

        self.read_len = config['read_len']
        self.functions = []
        self.rconfig = config['registers']
        self.error_val = None
        self.error_out_of_range = error_out_of_range
        self.filter_errors = filter_errors
        for i in self.rconfig:
            self.functions.append(self.build_reader_function(i, self.rconfig[i]))

    def build_reader_function(self, name, rconfig):
        '''Build and return a closure around a function that converts a specified piece of information from the block'''
        offset = rconfig['offset']

        if rconfig['read_as'] == '16U':
            def evaluator(registers):
                return registers[offset]
        elif rconfig['read_as'] == '16S':
            def evaluator(registers):
                return twos(registers[offset], 2)
        elif rconfig['read_as'] == '32U':
            def evaluator(registers):
                return interp_unsigned_double_reg(registers[offset], registers[offset + 1])
        elif rconfig['read_as'] == '32S':
            def evaluator(registers):
                return interp_signed_double_reg(registers[offset], registers[offset + 1])
        elif 'bin' in rconfig['read_as']:
            evaluator = make_bin_reader(offset, rconfig['read_as'])
        else:
            def evaluator(registers):
                return self.error_val

        def process(registers):
            val = evaluator(registers)
            if 'scale' in rconfig:
                val = val * rconfig['scale']

            if self.error_out_of_range:
                try:
                    if val < rconfig['min_val']:
                        val = self.error_val
                except KeyError:
                    pass

                try:
                    if val > rconfig['max_val']:
                        val = self.error_val
                except KeyError:
                    pass

            if val != self.error_val or not self.filter_errors:
                return {name: {'value': float(val), 'units': rconfig['units']}}
            else:
                return None

        return process

    def read(self, client):
        # Perform the read for the entire block
        registers = client.read_holding_registers(self.read_start, self.read_len)
        return_data = {}
        try:
            # Iterate through the functions that convert and return the individual pieces of
            # data from this block
            for f in self.functions:
                this_data = f(registers)
                if this_data is not None:
                    return_data.update(this_data)
        except Exception as e:
            # print(registers)
            print(f'Error in processing data: {e}')

        return return_data


class Generator:
    """Functions to communite with the Generator controller

    Parameters
    ----------
    host : str
        Address of the generator controller.
    port : int
        Port to generator controller, default to 5021.
    config_dir : string
        Sub-directory of .yaml configuration files that specify blocks of registers to read
        and specifies how to convert them into useable data.
    block_space_time : float
        Amount of time (in seconds) to wait between issuing seperate read_multiple_registers
        commands to the device. DSE device appears to crash without this waiting period.
    close_port : boolean
       Whether or not to close the open port to the device while waiting for the next Pacemaker
       triggered read cycle. The idea here is that closing the port alllows DSEWebNet to function
       in parallel with the agent.

    Attributes
    ----------
    read_blocks : list
        List of ReadBlock objects that represent the different continuous register locations to
        read from as specified in the config files.
    client : ModbusClient
        ModbusClient object that initializes connection
    """

    def __init__(self, host, port, config_dir, block_space_time=.1, close_port=True):
        self.host = host
        self.port = port
        self.read_config = load_configs(config_dir)
        self._build_config()
        self.close_port = close_port
        self.block_space_time = block_space_time
        self.client = ModbusClient(self.host, self.port, auto_open=True, auto_close=False)
        self.client.open()

    def _build_config(self):
        self.read_blocks = []
        for block in self.read_config:
            self.read_blocks.append(ReadBlock(block))

    def read_cycle(self):
        this_cycle_data = {}
        for i, val in enumerate(self.read_blocks):
            data = self._read_regs(val)
            this_cycle_data.update(data)
            time.sleep(self.block_space_time)  # A gap in time is required between individual requests,
            # i.e. a pause between reading each read_multiple_registers command.
        return this_cycle_data

    def _read_regs(self, register_block_object):
        if self.close_port:
            self.client.open()
        try:
            data = register_block_object.read(self.client)
        except Exception as e:
            print('error in read', e)
            if self.close_port:
                self.client.open()
            return
        if self.close_port:
            self.client.open()
        return data


[docs] class GeneratorAgent: """Monitor the Generator controller via ModBus. Parameters ---------- agent : OCSAgent OCSAgent object which forms this Agent configdir : str Directory where .yaml configuration files specific to this generator instance are stored. host : str Address of the generator controller. port : int Port to generator controller, default to 5021. sample_interval : float Time between samples in seconds. Attributes ---------- agent : OCSAgent OCSAgent object which forms this Agent take_data : bool Tracks whether or not the agent is actively issuing SNMP GET commands to the ibootbar. Setting to false stops sending commands. log : txaio.tx.Logger txaio logger object, created by the OCSAgent """ def __init__(self, agent, configdir, host='localhost', port=5021, sample_interval=10.): self.host = host self.port = port self.agent: ocs_agent.OCSAgent = agent self.log = agent.log self.lock = TimeoutLock() self.configdir = configdir self.pacemaker_freq = 1. / sample_interval self.initialized = False self.take_data = False self.generator = None agg_params = { 'frame_length': 10 * 60 # [sec] } self.agent.register_feed('generator', record=True, agg_params=agg_params, buffer_time=0) def _connect(self): """connect() Instantiates Generator object and check if client is open """ self.generator = Generator(self.host, self.port, config_dir=self.configdir) if self.generator.client.is_open: self.initialized = True else: self.initialized = False
[docs] @ocs_agent.param('auto_acquire', default=False, type=bool) def init_generator(self, session, params=None): """init_generator(auto_acquire=False) **Task** - Perform first time setup of the Generator. Parameters: auto_acquire (bool, optional): Starts data acquisition after initialization if True. Defaults to False. """ if self.initialized: return True, "Already initialized." with self.lock.acquire_timeout(3, job='init') as acquired: if not acquired: self.log.warn("Could not start init because " "{} is already running".format(self.lock.job)) return False, "Could not acquire lock." self._connect() if not self.initialized: return False, 'Could not connect to generator' # Start data acquisition if requested if params['auto_acquire']: self.agent.start('acq') return True, 'Generator initialized.'
[docs] @ocs_agent.param('_') def acq(self, session, params=None): """acq() **Process** - Start data acquisition. Notes ----- The most recent data collected is stored in session data in the structure:: >>> response.session['data'] {"fields": {'Oil_pressure': {'value': 100.0, 'units': 'Kpa'}, 'Coolant_temperature': {'value': 20.0, 'units': 'Degrees C'}, 'Oil_temperature': {'value': 20.0, 'units': 'Degrees C'}, 'Fuel_level': {'value': 100.0, 'units': '%'}, 'Charge_alternator_voltage': {'value': 10.0, 'units': 'V'}, 'Engine_Battery_voltage': {'value': 10.0, 'units': 'V'}, 'Engine_speed': {'value': 4000, 'units': 'RPM'}, 'Generator_frequency': {'value': 1.0, 'units': 'Hz'}, ... 'connection': {'last_attempt': 1680812613.939653, 'connected': True}}, "address": 'localhost', "timestamp":1601925677.6914878} """ with self.lock.acquire_timeout(0, job='acq') as acquired: if not acquired: self.log.warn("Could not start acq because {} is already running" .format(self.lock.job)) return False, "Could not acquire lock." self.take_data = True session.data = {"fields": {}} pm = Pacemaker(self.pacemaker_freq) while self.take_data: pm.sleep() current_time = time.time() data = { 'timestamp': current_time, 'connection': {}, 'block_name': 'registers', 'data': {} } if not self.generator.client.is_open: self.initialized = False # Try to re-initialize if connection lost if not self.initialized: self._connect() # Only get readings if connected if self.initialized: session.data.update({'connection': {'last_attempt': time.time(), 'connected': True}}) session.data['address'] = self.host regdata = self.generator.read_cycle() if regdata: for reg in regdata: data['data'][reg] = regdata[reg]["value"] field_dict = {reg: regdata[reg]['value']} session.data['fields'].update(field_dict) session.data.update({'timestamp': current_time}) else: self.log.info('Connection error or error in processing data.') self.initialized = False # Continue trying to connect if not self.initialized: session.data.update({'connection': {'last_attempt': time.time(), 'connected': False}}) self.log.info('Trying to reconnect.') continue for field, val in data['data'].items(): _data = { 'timestamp': current_time, 'block_name': field, 'data': {field: val} } self.agent.publish_to_feed('generator', _data) self.agent.feeds['generator'].flush_buffer() return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None): """ Stops acq process. """ if self.take_data: self.take_data = False return True, 'requested to stop taking data.' else: return False, 'acq is not currently running'
def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument("--host", help="Address to listen to.") pgroup.add_argument("--port", default=5021, help="Port to listen on.") pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'], help="Starting action for the agent.") pgroup.add_argument("--configdir", type=str, help="Path to directory containing .yaml config files.") pgroup.add_argument("--sample-interval", type=float, default=10., help="Time between samples in seconds.") return parser def main(args=None): # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = make_parser() # Interpret options in the context of site_config. args = site_config.parse_args(agent_class='GeneratorAgent', parser=parser, args=args) # Automatically acquire data if requested (default) init_params = False if args.mode == 'init': init_params = {'auto_acquire': False} elif args.mode == 'acq': init_params = {'auto_acquire': True} agent, runner = ocs_agent.init_site_agent(args) p = GeneratorAgent(agent, configdir=args.configdir, host=args.host, port=int(args.port), sample_interval=args.sample_interval) agent.register_task('init_generator', p.init_generator, startup=init_params) agent.register_process('acq', p.acq, p._stop_acq) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()