Source code for socs.agents.bluefors.agent

import argparse
import datetime
import glob
import os
import re
import threading
import time

import txaio
from ocs import ocs_agent, site_config

# Notes:
# Each Lakeshore log gets its own "block" due to tracking the logs
# being somewhat tricky. If a block structure is established and
# then the thermometer removed from the scan, say if the user
# switches to scanning a single channel, then the block structure
# won't be reconstructed and thus won't match the existing
# structure, causing an error.

# For logging
txaio.use_twisted()
LOG = txaio.make_logger()


[docs] class LogTracker: """Log Tracking helper class. Always tracks current date's logs. Parameters ---------- log_dir : str Top level log directory Attributes ---------- log_dir : str Top level log directory date : datetime.date Today's date. Used to determine the active log directory file_objects : dict A dictionary with filenames as keys, and another dict as the value. Each of these sub-dictionaries has two keys, "file_object", and "stat_results", with the open file object, and os.stat results as values, respectively. For example:: {'CH6 T 21-05-27.log': {'file_object': <_io.TextIOWrapper name='CH6 T 21-05-27.log' mode='r' encoding='UTF-8'>, 'stat_results': os.stat_result(st_mode=33188, st_ino=1456748, st_dev=65024, st_nlink=1, st_uid=1000, st_gid=1000, st_size=11013, st_atime=1622135813, st_mtime=1622135813, st_ctime=1622135813) } } """ def __init__(self, log_dir): self.log_dir = log_dir self.date = datetime.date.fromtimestamp(time.time()) self.file_objects = {} def _build_file_list(self): """Get list of files to open. Assumes all logs are set to log to same top level directory, i.e. /home/bluefors/logs/ """ date_str = self.date.strftime("%y-%m-%d") file_list = glob.glob("{}/{}/*.log".format(self.log_dir, date_str)) return file_list def _open_file(self, filename): """Open a single file, adding it to self.file_objects. Parameters ---------- filename : str Full path to filename to open """ if filename not in self.file_objects.keys(): print("{} not yet open, opening...".format(filename)) self.file_objects[filename] = {"file_object": open(filename, 'r'), "stat_results": os.stat(filename)} else: pass
[docs] def check_open_files(self): """Check all log files are opened. The Status logs don't always exist, but we want to catch them when they do get generated. This rebuilds the file list and checks all the files in it are in the open file objects dictionary. """ file_list = self._build_file_list() for f in file_list: self._open_file(f)
[docs] def set_active_date(self): """Set the active date to today.""" new_date = datetime.date.fromtimestamp(time.time()) if new_date > self.date: self.close_all_files() self.date = new_date self.open_all_logs()
[docs] def reopen_file(self, filename): """If a file is already open and a new inode is detected, reopen the file. Returns the last line of the file. Parameters ---------- filename : str Full path to filename to open """ self.file_objects[filename] = {"file_object": open(filename, 'r'), "stat_results": os.stat(filename)} lines = self.file_objects[filename]["file_object"].readlines() return lines[-1]
[docs] def open_all_logs(self): """Open today's logs and move to end of files.""" file_list = self._build_file_list() self.file_objects = {} for _file in file_list: self._open_file(_file) for k, v in self.file_objects.items(): v['file_object'].readlines()
[docs] def close_all_files(self): """Close all the files tracked by the LogTracker.""" for k, v in self.file_objects.items(): v['file_object'].close() print("Closed file: {}".format(k)) self.file_objects = {}
[docs] class LogParser: """Log Parsing helper class. Knows the internal formats for each log type. Used to loop over all logs tracked by a LogTracker and publish their contents to an OCS Feed. Parameters ---------- tracker : LogTracker log tracker that contains paths and file objects to parse mode : str Operating mode for the log tracker. Either "follow" or "poll", defaulting to "follow". In "follow" mode the Tracker will read the next line in the file if able to. In "poll" mode stats about the file are used to determine if it was updated since the last read, and if it has been the file is reopened to get the last line. This is more I/O intensive, but is useful in certain configurations. stale_time : int Time in minutes which represents how fresh data in the bluefors logs must be when we open them in order to publish to OCS. This ensures we don't reopen a file much later than when they were collected and publish "stale" data to the OCS live HK system. """ def __init__(self, tracker, mode="follow", stale_time=2): self.log_tracker = tracker self.patterns = {'channels': ['v11', 'v2', 'v1', 'turbo1', 'v12', 'v3', 'v10', 'v14', 'v4', 'v13', 'compressor', 'v15', 'v5', 'hs-still', 'v21', 'v16', 'v6', 'scroll1', 'v17', 'v7', 'scroll2', 'v18', 'v8', 'pulsetube', 'v19', 'v20', 'v9', 'hs-mc', 'ext'], 'status': ['tc400errorcode', 'tc400ovtempelec', 'tc400ovtemppump', 'tc400setspdatt', 'tc400pumpaccel', 'tc400commerr', 'tc400errorcode_2', 'tc400ovtempelec_2', 'tc400ovtemppump_2', 'tc400setspdatt_2', 'tc400pumpaccel_2', 'tc400commerr_2', 'nxdsf', 'nxdsct', 'nxdst', 'nxdsbs', 'nxdstrs', 'ctrl_pres', 'cpastate', 'cparun', 'cpawarn', 'cpaerr', 'cpatempwi', 'cpatempwo', 'cpatempo', 'cpatemph', 'cpalp', 'cpalpa', 'cpahp', 'cpahpa', 'cpadp', 'cpacurrent', 'cpahours', 'cpapscale', 'cpascale', 'cpatscale', 'cpasn', 'cpamodel', 'ctrl_pres_ok', 'ctr_pressure_ok'], 'heater': ["a1_u", "a1_r_lead", "a1_r_htr", "a2_u", "a2_r_lead", "a2_r_htr", "htr", "htr_range"]} self.mode = mode self.stale_time = stale_time
[docs] @staticmethod def timestamp_from_str(time_string): """Convert time string from Bluefors log file into a UNIX timestamp. Parameters ---------- time_string : str String in format "%d-%m-%y,%H:%M:%S" Returns ------- float UNIX timestamp """ dt = datetime.datetime.strptime(time_string, "%d-%m-%y,%H:%M:%S") timestamp = dt.timestamp() return timestamp
def _parse_single_value_log(self, new_line, log_name): """Parses a simple single value log. Valid for LS372 and flowmeter log files. Parameters ---------- new_line : str The new line read by the Parser log_name : str The name of the log, returned by self.identify_log() """ date, _time, data_value = new_line.strip().split(',') time_str = "%s,%s" % (date, _time) timestamp = self.timestamp_from_str(time_str) # Data array to populate data = { 'timestamp': timestamp, 'block_name': log_name, 'data': {} } data['data'][log_name] = float(data_value) return data def _parse_maxigauge_log(self, new_line, log_name): """Parse the maxigauge logs. Parameters ---------- new_line : str The new line read by the Parser log_name : str The name of the log, returned by self.identify_log() """ ts, *channels = new_line.strip().split('CH') time_str = ts.strip(',') timestamp = self.timestamp_from_str(time_str) ch_data = {} for ch in channels: ch_num, _, state, value, *_ = ch.split(',') ch_data["pressure_ch{}_state".format(ch_num)] = int(state) ch_data["pressure_ch{}".format(ch_num)] = float(value) data = { 'timestamp': timestamp, 'block_name': log_name, 'data': ch_data } return data def _parse_multi_value_log(self, new_line, log_type, log_name): """Parse a log containing multiple values on each line. Valid for Channel, Status, and heater logs. Patterns to search for are all known by the parser a class attribute. Parameters ---------- new_line : str The new line read by the Parser log_type : str Log type as identified by self.identify_log, used to select search patterns log_name : str The name of the log, returned by self.identify_log() """ date, _time, *_ = new_line.strip().split(',') time_str = "%s,%s" % (date, _time) timestamp = self.timestamp_from_str(time_str) data_array = {} for pattern in self.patterns[log_type]: regex = re.compile(rf'{pattern},([0-9\.\+\-E]+)') m = regex.search(new_line) # skip patterns that don't exist in this log if not m: continue if log_type == 'channels': data_array[pattern.replace('-', '_')] = int(m.group(1)) else: data_array[pattern] = float(m.group(1)) data = { 'timestamp': timestamp, 'block_name': log_name, 'data': data_array } return data
[docs] @staticmethod def identify_log(filename): """Identify type of log return unique identifier Parameters ---------- filename : str path to file for identification Returns ------- tuple A tuple containing two strings: 1. The log type, i.e. 'lakeshore', 'flowmeter' 2. Unique identifier based on filetype, i.e. 'lakeshore_ch8_p', 'pressure_ch1_state' """ # file type: regex search pattern to identify file_types = {'lakeshore': '(CH[0-9]+ [T,R,P])', 'flowmeter': '(Flowmeter)', 'maxiguage': '(maxigauge)', 'channels': '(Channels)', 'status': '(Status)', 'errors': '(Errors)', 'heater': '(heaters)'} for k, v in file_types.items(): if re.search(v, filename): _type = k m = re.search(file_types[_type], filename) if _type == 'lakeshore': return _type, "{}_{}".format(_type, m.group(0).lower().replace(' ', '_')) else: return _type, m.group(0).lower() # If nothing matches return None return (None, None)
[docs] def read_and_publish_logs(self, app_session): """Read a new line from each log file if there is one, and publish its contents to the app_session's feed. Parameters ---------- app_session : ocs.ocs_agent.OpSession session from the ocs_agent, used to publish to bluefors feed """ for k, v in self.log_tracker.file_objects.items(): log_type, log_name = self.identify_log(k) if os.stat(k).st_ino != v['stat_results'].st_ino and self.mode == "poll": LOG.debug("New inode found, reopening...") new = self.log_tracker.reopen_file(k) LOG.debug("File: {f}, Line: {l}", f=k, l=new) # In a situation with a samba share mounted via sshfs, reading the # nextline didn't reliably work, nor does watching the inode. We'll # also check modification times, which maybe we should just do # instead of the inode check... elif os.stat(k).st_mtime > v['stat_results'].st_mtime and self.mode == "poll": LOG.debug("Modification detected, reopening...") new = self.log_tracker.reopen_file(k) LOG.debug("File: {f}, Line: {l}", f=k, l=new) else: new = v['file_object'].readline() if new == '': continue if log_type in ['lakeshore', 'flowmeter']: data = self._parse_single_value_log(new, log_name) elif log_type == 'maxiguage': data = self._parse_maxigauge_log(new, log_name) elif log_type in ['channels', 'status', 'heater']: data = self._parse_multi_value_log(new, log_type, log_name) elif log_type == 'errors': LOG.info("The Bluefors Agent cannot process error logs. " + "Not publishing.") data = None else: LOG.warn("Warning: Unknown log type. Skipping publish step. " + "This probably shouldn't happen.") LOG.warn("Filename: {}".format(k)) data = None if data is not None: LOG.debug("Data: {d}", d=data) # If the file was reopened due to an inode change we don't know # if the last line is recent enough to be worth publishing. Check if (time.time() - data['timestamp']) < int(self.stale_time) * 60: app_session.app.publish_to_feed('bluefors', data) else: LOG.warn("Not publishing stale data. Make sure your log " + "file sync is done at a rate faster than once ever " + "{x} minutes.", x=self.stale_time)
[docs] class BlueforsAgent: """Agent to track the Bluefors logs generated by an LD400. Parameters ---------- log_directory : str Top level log directory for the Bluefors logs """ def __init__(self, agent, log_directory): self.lock = threading.Semaphore() self.job = None self.log_tracker = LogTracker(log_directory) self.log = agent.log self.agent = agent # Registers bluefors feed agg_params = { 'frame_length': float(os.environ.get("FRAME_LENGTH", 10 * 60)) # [sec] } self.log.debug("frame_length set to {length}", length=agg_params['frame_length']) self.agent.register_feed('bluefors', record=True, agg_params=agg_params, ) def try_set_job(self, job_name): print(self.job, job_name) with self.lock: if self.job is None: self.job = job_name return (True, 'ok') else: return (False, 'Conflict: "%s" is already running.' % self.job) def set_job_done(self): with self.lock: self.job = None
[docs] def acq(self, session, params=None): """acq() **Process** - Monitor and publish data from the Bluefors log files. """ ok, msg = self.try_set_job('acq') if not ok: return ok, msg # Create file objects for all logs in today's directory self.log_tracker.open_all_logs() # Determine parser configuration stale_time = os.environ.get("STALE_TIME", 2) mode = os.environ.get("MODE", "follow") # Setup the Parser object with tracking info parser = LogParser(self.log_tracker, mode, stale_time) while True: with self.lock: if self.job == '!acq': break elif self.job == 'acq': pass else: return 10 # Make sure we're looking at today's logs self.log_tracker.set_active_date() # Ensure all the logs we want are open self.log_tracker.check_open_files() # Check for new lines and publish to feed parser.read_and_publish_logs(session) time.sleep(0.01) self.set_job_done() return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None): ok = False with self.lock: if self.job == 'acq': self.job = '!acq' ok = True return (ok, {True: 'Requested process stop.', False: 'Failed to request process stop.'}[ok])
def make_parser(parser=None): """Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() # Add options specific to this agent. pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--log-directory') return parser def main(args=None): # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) # Setup argument parser parser = make_parser() args = site_config.parse_args(agent_class='BlueforsAgent', parser=parser, args=args) LOG.info('I am following logs located at : %s' % args.log_directory) agent, runner = ocs_agent.init_site_agent(args) bluefors_agent = BlueforsAgent(agent, args.log_directory) agent.register_process('acq', bluefors_agent.acq, bluefors_agent._stop_acq, startup=True) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()