Source code for socs.agents.ifm_sbn246_flowmeter.agent

import argparse
import time
from os import environ

import requests
import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock


def extract(value):
    """
    Extract flow and temp from raw hexidecimal value from SBN246 Process Data.

    Parameters
    ----------
    value : str
        Hexidecimal value from SBN246 Process Data.

    Returns
    -------
    float, int
        The flow in units of gallons per minute (gpm) and temperature in units
        of F.

    """
    binary = bin(int(value, 16))[2:].zfill(32)
    _b_flow = binary[0:16]
    _b_temp = binary[17:30]

    flow = int(_b_flow, 2) / 10
    temp = int(_b_temp, 2)

    return flow, temp


[docs] class FlowmeterAgent: """ Monitor the flowmeter. Parameters ---------- agent : OCSAgent OCSAgent object which forms this Agent. ip_address: str IP address of IO-Link master to make requests from. daq_port: int Port on IO-Link master that connects to flowmeter. Choices are 1-4. """ def __init__(self, agent, ip_address, daq_port): self.agent = agent self.log = agent.log self.lock = TimeoutLock() self.ip_address = ip_address self.daq_port = daq_port # check serial number of flowmeter, otherwise may seriously # impact understanding of our flow and temp (converted into metric) prod_adr = "/iolinkmaster/port[{}]/iolinkdevice/productname/getdata".format(self.daq_port) q = requests.post('http://{}'.format(self.ip_address), json={"code": "request", "cid": -1, "adr": prod_adr}) assert q.json()['data']['value'] == 'SBN246', "Flowmeter device is not an SBN246 model" self.take_data = False agg_params = {'frame_length': 60, 'exclude_influx': False} # register the feed self.agent.register_feed('flowmeter', record=True, agg_params=agg_params, buffer_time=1 )
[docs] @ocs_agent.param('test_mode', default=False, type=bool) def acq(self, session, params=None): """ acq(test_mode=False) **Process** - Fetch values from the flowmeter using the IO-Link master. Parameters ---------- test_mode : bool, optional Run the Process loop only once. Meant only for testing. Default is False. Notes ----- The most recent data collected is stored in session data in the following structure. Note the units are [liters/min] and [Celsius]:: >>> response.session['data'] {'timestamp': 1682630863.0066128, 'fields': {'flow': 42.4, 'temperature': 22.8} } """ pm = Pacemaker(1, quantize=True) self.take_data = True while self.take_data: pm.sleep() dp = int(self.daq_port) adr = "/iolinkmaster/port[{}]/iolinkdevice/pdin/getdata".format(dp) url = 'http://{}'.format(self.ip_address) try: r = requests.post(url, json={"code": "request", "cid": -1, "adr": adr}) except requests.exceptions.ConnectionError as e: self.log.warn(f"Connection error occured: {e}") continue value = r.json()['data']['value'] flow_gpm, temp_f = extract(value) # units [gallons/minute], [F] flow = flow_gpm * 3.785411784 # liters/minute flow = round(flow, 1) temp = (temp_f - 32) * (5 / 9) # Celsius temp = round(temp, 1) now = time.time() data = {'block_name': 'flowmeter', 'timestamp': now, 'data': {'flow': flow, 'temperature': temp} } self.agent.publish_to_feed('flowmeter', data) session.data = {"timestamp": now, "fields": {}} session.data['fields']['flow'] = flow session.data['fields']['temperature'] = temp if params['test_mode']: break return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None): """ Stops acq process. """ self.take_data = False return True, 'Stopping acq process'
def add_agent_args(parser_in=None): if parser_in is None: parser_in = argparse.ArgumentParser() pgroup = parser_in.add_argument_group('Agent Options') pgroup.add_argument("--ip-address", type=str, help="IP address of IO-Link master.") pgroup.add_argument("--daq-port", type=int, help="Port on IO-Link master that flowmeter is connected to.") return parser_in def main(args=None): # For logging txaio.use_twisted() txaio.make_logger() txaio.start_logging(level=environ.get("LOGLEVEL", "info")) parser = add_agent_args() args = site_config.parse_args(agent_class='FlowmeterAgent', parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) f = FlowmeterAgent(agent, args.ip_address, args.daq_port) agent.register_process('acq', f.acq, f._stop_acq, startup=True) runner.run(agent, auto_reconnect=True) if __name__ == "__main__": main()